diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java index 15c31c0940da..b41559a38745 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java @@ -17,12 +17,15 @@ package org.apache.dolphinscheduler.server.master.integration; +import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -39,6 +42,9 @@ public class Repository { @Autowired private TaskInstanceDao taskInstanceDao; + @Autowired + private AlertDao alertDao; + /** * Return the list of process instances for a given workflow definition in ascending order of their IDs. */ @@ -87,4 +93,25 @@ public List queryAllTaskInstance() { return taskInstanceDao.queryAll(); } + public List queryAlert(final Integer workflowInstanceId) { + return alertDao.listAlerts(workflowInstanceId); + } + + /** + * Return the list of alerts for all workflow instances of the given workflow definition, + * sorted in ascending order by alert ID. + */ + public List queryAlert(final WorkflowDefinition workflowDefinition) { + List workflowInstances = queryWorkflowInstance(workflowDefinition); + + if (workflowInstances.isEmpty()) { + return Collections.emptyList(); + } + + return workflowInstances.stream() + .map(WorkflowInstance::getId) + .flatMap(id -> this.queryAlert(id).stream()) + .sorted(Comparator.comparingInt(Alert::getId)) + .collect(Collectors.toList()); + } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java index a2f0331f3483..db5a1bbece75 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java @@ -20,9 +20,11 @@ import static com.google.common.truth.Truth.assertThat; import static org.awaitility.Awaitility.await; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; @@ -1435,4 +1437,191 @@ void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runF }); masterContainer.assertAllResourceReleased(); } + + @Test + @DisplayName("Test start a workflow which contains a task with timeout warn strategy") + public void testStartWorkflow_withTimeoutWarnTask() { + final String yaml = "/it/start/workflow_with_timeout_warn_task.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition parentWorkflow = context.getOneWorkflow(); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(parentWorkflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + long expectedProjectCode = parentWorkflow.getProjectCode(); + long expectedWorkflowCode = parentWorkflow.getCode(); + + // Wait for the timeout to occur and alert to be sent (timeout + some buffer time) + await() + .atMost(Duration.ofSeconds(90)) + .untilAsserted(() -> { + // Check if the task instance has reached the expected state + Assertions + .assertThat(repository.queryTaskInstance(workflowInstanceId)) + .hasSize(1) + .satisfiesExactly(taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("warn_task_with_timeout_alert"); + assertThat(taskInstance.getState()).isNotEqualTo(TaskExecutionStatus.KILL); + }); + + // Check that the timeout alert was sent + List alerts = repository.queryAlert(workflowInstanceId); + System.out.println( + "🔍 Found " + alerts.size() + " alert(s) for workflowInstanceId=" + workflowInstanceId); + if (!alerts.isEmpty()) { + Alert alert = alerts.get(0); + System.out.println("📄 Alert details:"); + System.out.println(" - id: " + alert.getId()); + System.out.println(" - projectCode: " + alert.getProjectCode()); + System.out.println(" - workflowDefinitionCode: " + alert.getWorkflowDefinitionCode()); + System.out.println(" - alertType: " + alert.getAlertType()); + System.out.println(" - content: " + alert.getContent()); + System.out.println(" - createTime: " + alert.getCreateTime()); + } + + // Now do assertions + Assertions + .assertThat(alerts) + .hasSize(1) + .satisfiesExactly(alert -> { + assertThat(alert.getProjectCode()).isEqualTo(expectedProjectCode); + assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(expectedWorkflowCode); + assertThat(alert.getAlertType()).isEqualTo(AlertType.TASK_TIMEOUT); + }); + }); + + masterContainer.assertAllResourceReleased(); + } + + @Test + @DisplayName("Test timeout WARN task does NOT send alert when workflow has no warningGroupId") + public void testStartWorkflow_withTimeoutWarnTaskButNoWarningGroupId() { + final String yaml = "/it/start/workflow_with_timeout_warn_no_warning_group.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition parentWorkflow = context.getOneWorkflow(); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(parentWorkflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + // Wait long enough for timeout to occur (60s + buffer) + await() + .atMost(Duration.ofSeconds(90)) + .untilAsserted(() -> { + // Task should still be running (WARN strategy doesn't kill) + Assertions + .assertThat(repository.queryTaskInstance(workflowInstanceId)) + .hasSize(1) + .satisfiesExactly(taskInstance -> { + assertThat(taskInstance.getName()) + .isEqualTo("warn_task_no_alert_due_to_missing_warning_group"); + assertThat(taskInstance.getState()).isNotEqualTo(TaskExecutionStatus.KILL); + }); + + // NO alert should be sent because warningGroupId is null + Assertions + .assertThat(repository.queryAlert(workflowInstanceId)) + .isEmpty(); + }); + + masterContainer.assertAllResourceReleased(); + } + + @Test + @DisplayName("Test start a workflow which contains a task with timeout WARNFAILED strategy") + public void testStartWorkflow_withTimeoutWarnFailedTask() { + final String yaml = "/it/start/workflow_with_timeout_warnfailed_task.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition parentWorkflow = context.getOneWorkflow(); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(parentWorkflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + long expectedProjectCode = parentWorkflow.getProjectCode(); + long expectedWorkflowCode = parentWorkflow.getCode(); + + // Wait for the timeout to occur, alert to be sent, and task to be killed (timeout + buffer) + await() + .atMost(Duration.ofSeconds(90)) + .untilAsserted(() -> { + // Check that the task instance has been marked as FAILED due to timeout kill + Assertions + .assertThat(repository.queryTaskInstance(workflowInstanceId)) + .hasSize(1) + .satisfiesExactly(taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("warnfailed_task_with_timeout_alert_kill"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL); + }); + + // Check that the timeout alert was sent + List alerts = repository.queryAlert(workflowInstanceId); + System.out.println( + "🔍 Found " + alerts.size() + " alert(s) for workflowInstanceId=" + workflowInstanceId); + if (!alerts.isEmpty()) { + Alert alert = alerts.get(0); + System.out.println("📄 Alert details:"); + System.out.println(" - id: " + alert.getId()); + System.out.println(" - projectCode: " + alert.getProjectCode()); + System.out.println(" - workflowDefinitionCode: " + alert.getWorkflowDefinitionCode()); + System.out.println(" - alertType: " + alert.getAlertType()); + System.out.println(" - content: " + alert.getContent()); + System.out.println(" - createTime: " + alert.getCreateTime()); + } + + // Now do assertions + Assertions + .assertThat(alerts) + .hasSize(1) + .satisfiesExactly(alert -> { + assertThat(alert.getProjectCode()).isEqualTo(expectedProjectCode); + assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(expectedWorkflowCode); + assertThat(alert.getAlertType()).isEqualTo(AlertType.TASK_TIMEOUT); + }); + }); + + masterContainer.assertAllResourceReleased(); + } + + @Test + @DisplayName("Test WARNFAILED task kills the task but does NOT send alert when warningGroupId is null") + public void testStartWorkflow_withTimeoutWarnFailedTaskButNoWarningGroupId() { + final String yaml = "/it/start/workflow_with_timeout_warnfailed_no_warning_group.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition parentWorkflow = context.getOneWorkflow(); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(parentWorkflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + // Wait for timeout to trigger kill (60s timeout + buffer) + await() + .atMost(Duration.ofSeconds(90)) + .untilAsserted(() -> { + // Task should be KILLED → state = KILL + Assertions + .assertThat(repository.queryTaskInstance(workflowInstanceId)) + .hasSize(1) + .satisfiesExactly(taskInstance -> { + assertThat(taskInstance.getName()) + .isEqualTo("warnfailed_task_no_alert_due_to_missing_warning_group"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.KILL); + }); + + // NO alert should be sent because warningGroupId is null + Assertions + .assertThat(repository.queryAlert(workflowInstanceId)) + .isEmpty(); + }); + + masterContainer.assertAllResourceReleased(); + } + } diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warn_no_warning_group.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warn_no_warning_group.yaml new file mode 100644 index 000000000000..5fa16f386183 --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warn_no_warning_group.yaml @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: Project for testing timeout alert when warningGroupId is null + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + +# Workflow WITHOUT warningGroupId (set to null/omitted) +workflows: + - name: workflow_with_timeout_warn_no_warning_group + code: 1 + version: 1 + projectCode: 1 + description: Workflow with WARN timeout task but NO warningGroupId, should NOT send alert + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + # warningGroupId is intentionally omitted (or set to null) + +tasks: + - name: warn_task_no_alert_due_to_missing_warning_group + code: 1 + version: 1 + projectCode: 1 + userId: 1 + timeoutFlag: 'OPEN' + timeoutNotifyStrategy: 'WARN' # Should trigger alert IF warningGroupId exists + timeout: 1 # 1 minute + taskType: LogicFakeTask + taskParams: '{"localParams":[],"shellScript":"sleep 70","resourceList":[]}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 \ No newline at end of file diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warn_task.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warn_task.yaml new file mode 100644 index 000000000000..726d9f8ebef2 --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warn_task.yaml @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project for testing timeout warning alerts + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + +# Workflow that contains a task configured with TIMEOUT + WARN strategy +workflows: + - name: workflow_with_timeout_warn_task + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with single timeout task using WARN strategy + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + warningGroupId: 1 # Required for sending timeout alerts; must be non-null to trigger alert + +# Task that will exceed its timeout limit to trigger a WARN alert (not kill) +tasks: + - name: warn_task_with_timeout_alert + code: 1 + version: 1 + projectCode: 1 + userId: 1 + timeoutFlag: 'OPEN' # Enable timeout mechanism + timeoutNotifyStrategy: 'WARN' # Only send alert on timeout (do NOT kill the task) + timeout: 1 # Timeout after 1 minute + taskType: LogicFakeTask + taskParams: '{"localParams":[],"shellScript":"sleep 70","resourceList":[]}' # Sleeps for 70s (> timeout), simulating a hanging task + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + taskExecuteType: BATCH + +# Task dependency: this workflow starts with the timeout-warning task +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 \ No newline at end of file diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warnfailed_no_warning_group.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warnfailed_no_warning_group.yaml new file mode 100644 index 000000000000..aef8da2af83b --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warnfailed_no_warning_group.yaml @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: Project for testing WARNFAILED timeout without warningGroupId + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + +# Workflow with WARNFAILED task but NO warningGroupId → should kill task but NOT send alert +workflows: + - name: workflow_with_timeout_warnfailed_no_warning_group + code: 1 + version: 1 + projectCode: 1 + description: WARNFAILED task with null warningGroupId, kill on timeout, no alert + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + # warningGroupId is intentionally omitted → treated as null + +tasks: + - name: warnfailed_task_no_alert_due_to_missing_warning_group + code: 1 + version: 1 + projectCode: 1 + userId: 1 + timeoutFlag: 'OPEN' + timeoutNotifyStrategy: 'WARNFAILED' # Should kill + (conditionally) alert + timeout: 1 # 1 minute timeout + taskType: LogicFakeTask + taskParams: '{"localParams":[],"shellScript":"sleep 120","resourceList":[]}' # Runs 120s > timeout + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 \ No newline at end of file diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warnfailed_task.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warnfailed_task.yaml new file mode 100644 index 000000000000..7e4ee8785ef1 --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_timeout_warnfailed_task.yaml @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project for testing timeout WARNFAILED alerts + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + +# Workflow with WARNFAILED timeout strategy +workflows: + - name: workflow_with_timeout_warnfailed_task + code: 1 + version: 1 + projectCode: 1 + description: Workflow with task using WARNFAILED timeout strategy (alert + kill) + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + warningGroupId: 1 + +# Task that triggers WARNFAILED on timeout +tasks: + - name: warnfailed_task_with_timeout_alert_kill + code: 1 + version: 1 + projectCode: 1 + userId: 1 + timeoutFlag: 'OPEN' # Enable timeout mechanism + timeoutNotifyStrategy: 'WARNFAILED' # Only send alert and kill task on timeout + timeout: 1 # Timeout after 1 minute + taskType: LogicFakeTask + taskParams: '{"localParams":[],"shellScript":"sleep 120","resourceList":[]}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 \ No newline at end of file