Skip to content

Commit 6b03838

Browse files
authored
[Fix-17834] [Master] Fix workflow failure strategy cannot work (#17851)
1 parent f21550f commit 6b03838

File tree

19 files changed

+353
-32
lines changed

19 files changed

+353
-32
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/ITaskExecutionRunnable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ default String getName() {
6767
*/
6868
boolean isTaskInstanceCanRetry();
6969

70+
boolean isFailure();
71+
7072
/**
7173
* Retry the TaskExecutionRunnable.
7274
* <p> Will create retry task instance and start it.

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public boolean isTaskInstanceCanRetry() {
9696
return taskInstance.getRetryTimes() < taskInstance.getMaxRetryTimes();
9797
}
9898

99+
@Override
100+
public boolean isFailure() {
101+
return isTaskInstanceInitialized() && !isTaskInstanceCanRetry() && taskInstance.getState().isFailure();
102+
}
103+
99104
@Override
100105
public void retry() {
101106
checkState(isTaskInstanceInitialized(), "The task instance is not initialized, can't initialize retry task.");

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void onPausedEvent(final IWorkflowExecutionRunnable workflowExecutionRunn
136136
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
137137
persistentTaskInstancePausedEventToDB(taskExecutionRunnable, taskPausedEvent);
138138
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainPause(taskExecutionRunnable);
139-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
139+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable, taskExecutionRunnable);
140140
}
141141

142142
private void persistentTaskInstancePausedEventToDB(final ITaskExecutionRunnable taskExecutionRunnable,
@@ -153,7 +153,7 @@ public void onKilledEvent(final IWorkflowExecutionRunnable workflowExecutionRunn
153153
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
154154
persistentTaskInstanceKilledEventToDB(taskExecutionRunnable, taskInstanceKillEvent);
155155
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainKill(taskExecutionRunnable);
156-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
156+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable, taskExecutionRunnable);
157157
}
158158

159159
private void persistentTaskInstanceKilledEventToDB(final ITaskExecutionRunnable taskExecutionRunnable,
@@ -181,11 +181,12 @@ public void onFailedEvent(final IWorkflowExecutionRunnable workflowExecutionRunn
181181
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
182182
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
183183
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
184-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
184+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable, taskExecutionRunnable);
185185
return;
186186
}
187+
// todo: Use Plugin to extend the act strategy on xxEvent.
187188
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
188-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
189+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable, taskExecutionRunnable);
189190
}
190191

191192
private void persistentTaskInstanceFailedEventToDB(final ITaskExecutionRunnable taskExecutionRunnable,
@@ -203,7 +204,7 @@ public void onSucceedEvent(final IWorkflowExecutionRunnable workflowExecutionRun
203204
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
204205
persistentTaskInstanceSuccessEventToDB(taskExecutionRunnable, taskSuccessEvent);
205206
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
206-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
207+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable, taskExecutionRunnable);
207208
}
208209

209210
protected void mergeTaskVarPoolToWorkflow(final IWorkflowExecutionRunnable workflowExecutionRunnable,
@@ -244,9 +245,9 @@ protected void tryToDispatchTask(final ITaskExecutionRunnable taskExecutionRunna
244245
taskExecutionRunnable.getWorkflowEventBus().publish(TaskDispatchLifecycleEvent.of(taskExecutionRunnable));
245246
}
246247

247-
protected void publishWorkflowInstanceTopologyLogicalTransitionEvent(final ITaskExecutionRunnable taskExecutionRunnable) {
248-
final Integer workflowInstanceId = taskExecutionRunnable.getWorkflowInstance().getId();
249-
final IWorkflowExecutionRunnable workflowExecutionRunnable = workflowRepository.get(workflowInstanceId);
248+
protected void publishWorkflowInstanceTopologyLogicalTransitionEvent(
249+
final IWorkflowExecutionRunnable workflowExecutionRunnable,
250+
final ITaskExecutionRunnable taskExecutionRunnable) {
250251
taskExecutionRunnable
251252
.getWorkflowEventBus()
252253
.publish(

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskKillStateAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void onStartEvent(final IWorkflowExecutionRunnable workflowExecutionRunna
4747
final TaskStartLifecycleEvent taskStartEvent) {
4848
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
4949
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainKill(taskExecutionRunnable);
50-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
50+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable, taskExecutionRunnable);
5151
}
5252

5353
@Override

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskPauseStateAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void onStartEvent(final IWorkflowExecutionRunnable workflowExecutionRunna
4747
final TaskStartLifecycleEvent taskStartEvent) {
4848
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
4949
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainPause(taskExecutionRunnable);
50-
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
50+
publishWorkflowInstanceTopologyLogicalTransitionEvent(workflowExecutionRunnable, taskExecutionRunnable);
5151
}
5252

5353
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
21+
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
22+
23+
/**
24+
* The strategy used to deal with {@link org.apache.dolphinscheduler.common.enums.FailureStrategy#CONTINUE} when task failure occurs.
25+
* <p> Will wait the active the tasks finished.
26+
*/
27+
public class ContinueWorkflowFailureStrategy implements IWorkflowFailureStrategy {
28+
29+
@Override
30+
public void onTaskFailure(IWorkflowExecutionRunnable workflowExecutionRunnable,
31+
ITaskExecutionRunnable taskExecutionRunnable) {
32+
// do nothing, just continue workflow execution
33+
}
34+
35+
@Override
36+
public boolean canTriggerSuccessor(IWorkflowExecutionRunnable workflowExecutionRunnable,
37+
ITaskExecutionRunnable taskExecutionRunnable) {
38+
return true;
39+
}
40+
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
21+
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
22+
23+
import lombok.extern.slf4j.Slf4j;
24+
25+
/**
26+
* The strategy used to deal with {@link org.apache.dolphinscheduler.common.enums.FailureStrategy#END} when task failure occurs.
27+
* <p> Will kill the active tasks and end the workflow execution.
28+
*/
29+
@Slf4j
30+
public class EndWorkflowFailureStrategy implements IWorkflowFailureStrategy {
31+
32+
@Override
33+
public void onTaskFailure(IWorkflowExecutionRunnable workflowExecutionRunnable,
34+
ITaskExecutionRunnable taskExecutionRunnable) {
35+
log.info("The workflow instance: [{}] using END failure strategy, will kill the active tasks.",
36+
workflowExecutionRunnable.getName());
37+
workflowExecutionRunnable.killActiveTasks();
38+
}
39+
40+
@Override
41+
public boolean canTriggerSuccessor(IWorkflowExecutionRunnable workflowExecutionRunnable,
42+
ITaskExecutionRunnable taskExecutionRunnable) {
43+
return !workflowExecutionRunnable.getWorkflowExecutionGraph().isExistFailureTaskExecutionRunnableChain();
44+
}
45+
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
19+
20+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
21+
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
22+
23+
/**
24+
* Used to deal with {@link org.apache.dolphinscheduler.common.enums.FailureStrategy} when task failure occurs
25+
*/
26+
public interface IWorkflowFailureStrategy {
27+
28+
void onTaskFailure(IWorkflowExecutionRunnable workflowExecutionRunnable,
29+
ITaskExecutionRunnable taskExecutionRunnable);
30+
31+
boolean canTriggerSuccessor(IWorkflowExecutionRunnable workflowExecutionRunnable,
32+
ITaskExecutionRunnable taskExecutionRunnable);
33+
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.workflow.policy;
19+
20+
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
21+
22+
public class WorkflowFailureStrategyFactory {
23+
24+
public static IWorkflowFailureStrategy getStrategy(final FailureStrategy failureStrategy) {
25+
if (failureStrategy == FailureStrategy.END) {
26+
return new EndWorkflowFailureStrategy();
27+
}
28+
return new ContinueWorkflowFailureStrategy();
29+
}
30+
31+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/IWorkflowExecutionRunnable.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2222
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
2323
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
24+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2425
import org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener;
26+
import org.apache.dolphinscheduler.server.master.engine.workflow.policy.IWorkflowFailureStrategy;
2527
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
2628

2729
import java.util.List;
@@ -68,6 +70,13 @@ default boolean isWorkflowReadyStop() {
6870
return workflowExecutionStatus == WorkflowExecutionStatus.READY_STOP;
6971
}
7072

73+
/**
74+
* Kill the active tasks of the WorkflowExecutionRunnable.
75+
*/
76+
default void killActiveTasks() {
77+
getWorkflowExecutionGraph().getActiveTaskExecutionRunnable().forEach(ITaskExecutionRunnable::kill);
78+
}
79+
7180
/**
7281
* Get the WorkflowExecuteContext belongs to the WorkflowExecutionRunnable.
7382
*/
@@ -111,4 +120,5 @@ default IWorkflowExecutionGraph getWorkflowExecutionGraph() {
111120
*/
112121
void registerWorkflowInstanceLifecycleListener(IWorkflowLifecycleListener listener);
113122

123+
IWorkflowFailureStrategy getWorkflowFailureStrategy();
114124
}

0 commit comments

Comments
 (0)