diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 66dcb4aeb1ab..708c8ad6b8a0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -244,6 +244,13 @@ private void saveTaskTimeoutAlert(Alert alert, String content, int alertGroupId) public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance, TaskInstance taskInstance, ProjectUser projectUser) { + if (projectUser == null) { + throw new IllegalArgumentException("projectUser must not be null"); + } + if (workflowInstance.getWarningGroupId() == null) { + throw new IllegalArgumentException("warningGroupId of the workflow instance must not be null"); + } + Alert alert = new Alert(); List workflowAlertContentList = new ArrayList<>(1); WorkflowAlertContent workflowAlertContent = WorkflowAlertContent.builder() diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java index 9caa42f1e9e3..feeac570b4e8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectDao.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao.repository; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; import java.util.Collection; import java.util.List; @@ -28,4 +29,5 @@ public interface ProjectDao extends IDao { Project queryByCode(Long projectCode); + ProjectUser queryProjectWithUserByWorkflowInstanceId(int workflowInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java index f3f0c3b03935..81cc8a3dde61 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectDaoImpl.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao.repository.impl; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.ProjectDao; @@ -45,4 +46,9 @@ public List queryByCodes(Collection projectCodes) { public Project queryByCode(Long projectCode) { return mybatisMapper.queryByCode(projectCode); } + + @Override + public ProjectUser queryProjectWithUserByWorkflowInstanceId(int workflowInstanceId) { + return mybatisMapper.queryProjectWithUserByWorkflowInstanceId(workflowInstanceId); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java index d406168c0743..5dfb0ec8397e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/handler/TaskTimeoutLifecycleEventHandler.java @@ -62,10 +62,18 @@ public void handle(final ITaskStateAction taskStateAction, log.info("The task {} TimeoutStrategy is null.", taskName); return; } + + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + final boolean shouldSendAlert = workflowInstance.getWarningGroupId() != null; + switch (timeoutNotifyStrategy) { case WARN: log.info("The task {} TimeoutStrategy is WARN, try to send a timeout alert.", taskName); - doTaskTimeoutAlert(taskExecutionRunnable); + if (shouldSendAlert) { + doTaskTimeoutAlert(taskExecutionRunnable); + } else { + log.info("Skipped sending timeout alert for task {} because warningGroupId is null.", taskName); + } break; case FAILED: log.info("The task {} TimeoutStrategy is FAILED, try to publish a kill event.", taskName); @@ -76,7 +84,11 @@ public void handle(final ITaskStateAction taskStateAction, "The task {} TimeoutStrategy is WARNFAILED, try to publish a kill event and send a timeout alert.", taskName); doTaskTimeoutKill(taskExecutionRunnable); - doTaskTimeoutAlert(taskExecutionRunnable); + if (shouldSendAlert) { + doTaskTimeoutAlert(taskExecutionRunnable); + } else { + log.info("Skipped sending timeout alert for task {} because warningGroupId is null.", taskName); + } default: log.warn("The task {} TimeoutStrategy is invalided.", taskName); break; @@ -90,8 +102,7 @@ private void doTaskTimeoutKill(final ITaskExecutionRunnable taskExecutionRunnabl private void doTaskTimeoutAlert(final ITaskExecutionRunnable taskExecutionRunnable) { final WorkflowInstance workflowInstance = taskExecutionRunnable.getWorkflowInstance(); final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); - // todo: inject the projectUser - workflowAlertManager.sendTaskTimeoutAlert(workflowInstance, taskInstance, null); + workflowAlertManager.sendTaskTimeoutAlert(workflowInstance, taskInstance); } @Override diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java index b792ee508962..bbe3ec3b07b7 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java @@ -260,8 +260,8 @@ public boolean isNeedToSendWarning(WorkflowInstance workflowInstance) { } public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance, - TaskInstance taskInstance, - ProjectUser projectUser) { + TaskInstance taskInstance) { + ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId()); alertDao.sendTaskTimeoutAlert(workflowInstance, taskInstance, projectUser); } }