Skip to content

Commit 0b705ad

Browse files
Zzih96ruanwenjun
authored andcommitted
add workflow timeout
1 parent 6823d26 commit 0b705ad

File tree

10 files changed

+625
-20
lines changed

10 files changed

+625
-20
lines changed

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,10 @@ public void sendServerStoppedAlert(String host, String serverType) {
192192
* workflow time out alert
193193
*
194194
* @param workflowInstance workflowInstance
195-
* @param projectUser projectUser
195+
* @param projectUser projectUser
196+
* @param modifyBy modifyBy
196197
*/
197-
public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) {
198+
public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser, String modifyBy) {
198199
int alertGroupId = workflowInstance.getWarningGroupId();
199200
Alert alert = new Alert();
200201
List<WorkflowAlertContent> workflowAlertContentList = new ArrayList<>(1);
@@ -207,6 +208,8 @@ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectU
207208
.workflowInstanceName(workflowInstance.getName())
208209
.commandType(workflowInstance.getCommandType())
209210
.workflowExecutionStatus(workflowInstance.getState())
211+
.modifyBy(modifyBy)
212+
.recovery(workflowInstance.getRecovery())
210213
.runTimes(workflowInstance.getRunTimes())
211214
.workflowStartTime(workflowInstance.getStartTime())
212215
.workflowHost(workflowInstance.getHost())

dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/AlertDaoTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,16 @@
1818
package org.apache.dolphinscheduler.dao.repository.impl;
1919

2020
import org.apache.dolphinscheduler.common.enums.AlertStatus;
21+
import org.apache.dolphinscheduler.common.enums.AlertType;
22+
import org.apache.dolphinscheduler.common.enums.CommandType;
23+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2124
import org.apache.dolphinscheduler.dao.AlertDao;
2225
import org.apache.dolphinscheduler.dao.BaseDaoTest;
2326
import org.apache.dolphinscheduler.dao.entity.Alert;
27+
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
28+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2429

30+
import java.util.Date;
2531
import java.util.List;
2632

2733
import org.junit.jupiter.api.Assertions;
@@ -66,4 +72,49 @@ void testSendServerStoppedAlert() {
6672
.count();
6773
Assertions.assertEquals(1L, count);
6874
}
75+
76+
@Test
77+
void testSendWorkflowTimeoutAlert() {
78+
WorkflowInstance workflowInstance = new WorkflowInstance();
79+
workflowInstance.setId(1);
80+
workflowInstance.setName("test-workflow-timeout");
81+
workflowInstance.setWorkflowDefinitionCode(100L);
82+
workflowInstance.setCommandType(CommandType.START_PROCESS);
83+
workflowInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
84+
workflowInstance.setStartTime(new Date());
85+
workflowInstance.setHost("localhost");
86+
workflowInstance.setWarningGroupId(1);
87+
88+
ProjectUser projectUser = new ProjectUser();
89+
projectUser.setProjectCode(1L);
90+
projectUser.setProjectName("test-project");
91+
projectUser.setUserName("admin");
92+
93+
alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser, "admin");
94+
95+
List<Alert> alerts = alertDao.listPendingAlerts(-1);
96+
Assertions.assertNotNull(alerts);
97+
98+
long timeoutAlertCount = alerts.stream()
99+
.filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType()))
100+
.filter(alert -> alert.getWorkflowInstanceId() != null
101+
&& alert.getWorkflowInstanceId().equals(workflowInstance.getId()))
102+
.count();
103+
Assertions.assertEquals(1L, timeoutAlertCount);
104+
105+
Alert timeoutAlert = alerts.stream()
106+
.filter(alert -> AlertType.WORKFLOW_INSTANCE_TIMEOUT.equals(alert.getAlertType()))
107+
.filter(alert -> alert.getWorkflowInstanceId() != null
108+
&& alert.getWorkflowInstanceId().equals(workflowInstance.getId()))
109+
.findFirst()
110+
.orElse(null);
111+
112+
Assertions.assertNotNull(timeoutAlert);
113+
Assertions.assertEquals("Workflow Timeout Warn", timeoutAlert.getTitle());
114+
Assertions.assertEquals(projectUser.getProjectCode(), timeoutAlert.getProjectCode());
115+
Assertions.assertEquals(workflowInstance.getWorkflowDefinitionCode(),
116+
timeoutAlert.getWorkflowDefinitionCode());
117+
Assertions.assertEquals(workflowInstance.getId(), timeoutAlert.getWorkflowInstanceId());
118+
Assertions.assertTrue(timeoutAlert.getContent().contains("test-workflow-timeout"));
119+
}
69120
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,9 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType {
5757
* Finalize the workflow instance.
5858
*/
5959
FINALIZE,
60+
/**
61+
* The workflow instance timeout
62+
*/
63+
TIMEOUT,
6064

6165
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event;
19+
20+
import static com.google.common.base.Preconditions.checkState;
21+
22+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
23+
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
24+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent;
25+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
26+
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
27+
28+
import java.util.concurrent.TimeUnit;
29+
30+
import lombok.Getter;
31+
32+
@Getter
33+
public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent {
34+
35+
private final IWorkflowExecutionRunnable workflowExecutionRunnable;
36+
37+
protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
38+
final long timeout) {
39+
super(timeout);
40+
this.workflowExecutionRunnable = workflowExecutionRunnable;
41+
}
42+
43+
public static WorkflowTimeoutLifecycleEvent of(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
44+
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
45+
checkState(workflowInstance != null, "The workflow instance must be initialized before timeout monitoring.");
46+
47+
final int timeout = workflowInstance.getTimeout();
48+
checkState(timeout >= 0, "The workflow timeout: %s must >=0 minutes", timeout);
49+
50+
// Calculate remaining time until timeout: timeout - elapsed time
51+
long delayTime = TimeUnit.MINUTES.toMillis(timeout)
52+
- (System.currentTimeMillis() - workflowInstance.getStartTime().getTime());
53+
// Ensure delayTime is not negative (trigger immediately if already timeout)
54+
delayTime = Math.max(0, delayTime);
55+
return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime);
56+
}
57+
58+
@Override
59+
public ILifecycleEventType getEventType() {
60+
return WorkflowLifecycleEventType.TIMEOUT;
61+
}
62+
63+
@Override
64+
public String toString() {
65+
return "WorkflowTimeoutLifecycleEvent{" +
66+
"workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() +
67+
", timeout=" + delayTime +
68+
'}';
69+
}
70+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;
1919

20+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2021
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
2122
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
2223
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent;
24+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
2325
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
2426
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;
2527

@@ -37,12 +39,25 @@ public class WorkflowStartLifecycleEventHandler
3739
public void handle(final IWorkflowStateAction workflowStateAction,
3840
final IWorkflowExecutionRunnable workflowExecutionRunnable,
3941
final WorkflowStartLifecycleEvent workflowStartEvent) {
40-
42+
workflowTimeoutMonitor(workflowExecutionRunnable);
4143
workflowStateAction.onStartEvent(workflowExecutionRunnable, workflowStartEvent);
4244
}
4345

4446
@Override
4547
public ILifecycleEventType matchEventType() {
4648
return WorkflowLifecycleEventType.START;
4749
}
50+
51+
private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
52+
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
53+
if (workflowInstance.getTimeout() <= 0) {
54+
log.debug("The workflow {} timeout {} is not configured, skip timeout monitor.",
55+
workflowInstance.getName(),
56+
workflowInstance.getTimeout());
57+
return;
58+
}
59+
workflowExecutionRunnable.getWorkflowEventBus()
60+
.publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable));
61+
}
62+
4863
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;
19+
20+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
21+
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
22+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
23+
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
24+
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
25+
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;
26+
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;
27+
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
import org.springframework.stereotype.Component;
31+
32+
@Slf4j
33+
@Component
34+
public class WorkflowTimeoutLifecycleEventHandler
35+
extends
36+
AbstractWorkflowLifecycleEventHandler<WorkflowTimeoutLifecycleEvent> {
37+
38+
private final WorkflowAlertManager workflowAlertManager;
39+
40+
public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) {
41+
this.workflowAlertManager = workflowAlertManager;
42+
}
43+
44+
@Override
45+
public void handle(final IWorkflowStateAction workflowStateAction,
46+
final IWorkflowExecutionRunnable workflowExecutionRunnable,
47+
final WorkflowTimeoutLifecycleEvent workflowTimeoutEvent) {
48+
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
49+
final String workflowName = workflowExecutionRunnable.getName();
50+
51+
// Check if workflow is still active (not finished)
52+
if (workflowInstance.getState().isFinalState()) {
53+
log.info("The workflow {} has been finished with state: {}, skip timeout alert.",
54+
workflowName,
55+
workflowInstance.getState().name());
56+
return;
57+
}
58+
59+
log.info("The workflow {} has timeout, try to send a timeout alert.", workflowName);
60+
doWorkflowTimeoutAlert(workflowInstance);
61+
}
62+
63+
private void doWorkflowTimeoutAlert(final WorkflowInstance workflowInstance) {
64+
// ProjectUser will be built in WorkflowAlertManager
65+
workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance, null);
66+
}
67+
68+
@Override
69+
public ILifecycleEventType matchEventType() {
70+
return WorkflowLifecycleEventType.TIMEOUT;
71+
}
72+
}

0 commit comments

Comments
 (0)