Skip to content

Commit cba95a4

Browse files
Refactor: Use factory pattern for WorkflowExecutionGraph creation
Address reviewer feedback: command handlers should not be concerned with how instances are initialized. Changes: - Add WorkflowExecutionGraphFactory to centralize all graph creation logic - Remove IWorkflowExecutionGraphAssembler interface (no longer needed) - Remove assembleWorkflowExecutionGraphAssembler() from AbstractCommandHandler - Remove createWorkflowExecutionGraphAssembler() from all command handlers - Update WorkflowRunningStateAction to use factory for graph initialization - Update IWorkflowExecuteContext: add getProject(), change to setWorkflowExecutionGraph() - Update WorkflowExecuteContext to remove assembler, add setter The factory handles all command-type-specific graph creation strategies: - START_PROCESS: Fresh start with configured start nodes - REPEAT_RUNNING: Rerun entire workflow - START_FAILURE_TASK_PROCESS: Recover failed/paused tasks - RECOVER_TOLERANCE_FAULT_PROCESS: Failover with existing task instances - RECOVER_SERIAL_WAIT: Serial wait recovery
1 parent d7047e7 commit cba95a4

File tree

11 files changed

+373
-415
lines changed

11 files changed

+373
-415
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.dolphinscheduler.extract.master.command.ICommandParam;
3333
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
3434
import org.apache.dolphinscheduler.server.master.engine.command.ICommandHandler;
35-
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler;
3635
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
3736
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphFactory;
3837
import org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener;
@@ -81,7 +80,6 @@ public WorkflowExecutionRunnable handleCommand(final Command command) {
8180
assembleWorkflowInstance(workflowExecuteContextBuilder);
8281
assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder);
8382
assembleWorkflowEventBus(workflowExecuteContextBuilder);
84-
assembleWorkflowExecutionGraphAssembler(workflowExecuteContextBuilder);
8583

8684
final WorkflowExecutionRunnableBuilder workflowExecutionRunnableBuilder = WorkflowExecutionRunnableBuilder
8785
.builder()
@@ -126,32 +124,6 @@ protected void assembleWorkflowGraph(
126124
protected abstract void assembleWorkflowInstance(
127125
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder);
128126

129-
/**
130-
* Assemble the workflow execution graph assembler.
131-
* <p>
132-
* The assembler is used to defer the initialization of the WorkflowExecutionGraph
133-
* until the WorkflowStartLifecycleEvent is fired. This reduces transaction time
134-
* during command processing.
135-
*/
136-
protected void assembleWorkflowExecutionGraphAssembler(
137-
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
138-
final IWorkflowExecutionGraphAssembler assembler = createWorkflowExecutionGraphAssembler(
139-
workflowExecuteContextBuilder);
140-
workflowExecuteContextBuilder.setWorkflowExecutionGraphAssembler(assembler);
141-
}
142-
143-
/**
144-
* Create the workflow execution graph assembler.
145-
* <p>
146-
* Subclasses should implement this method to provide the logic for building
147-
* the WorkflowExecutionGraph. The returned assembler will be invoked when
148-
* the WorkflowStartLifecycleEvent is fired.
149-
*
150-
* @return the assembler for creating the WorkflowExecutionGraph, or null if no graph is needed
151-
*/
152-
protected abstract IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler(
153-
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder);
154-
155127
protected List<String> parseStartNodesFromWorkflowInstance(
156128
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
157129
final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance();

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,12 @@
2020
import org.apache.dolphinscheduler.common.enums.CommandType;
2121
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2222
import org.apache.dolphinscheduler.dao.entity.Command;
23-
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2423
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
25-
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
2624
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2725
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
28-
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler;
2926
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
3027

3128
import java.util.Date;
32-
import java.util.List;
3329

3430
import org.springframework.beans.factory.annotation.Autowired;
3531
import org.springframework.stereotype.Component;
@@ -43,9 +39,6 @@ public class ReRunWorkflowCommandHandler extends RunWorkflowCommandHandler {
4339
@Autowired
4440
private WorkflowInstanceDao workflowInstanceDao;
4541

46-
@Autowired
47-
private TaskInstanceDao taskInstanceDao;
48-
4942
@Autowired
5043
private MasterConfig masterConfig;
5144

@@ -80,22 +73,6 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work
8073
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
8174
}
8275

83-
@Override
84-
protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler(
85-
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
86-
// Capture parent assembler
87-
final IWorkflowExecutionGraphAssembler parentAssembler =
88-
super.createWorkflowExecutionGraphAssembler(workflowExecuteContextBuilder);
89-
final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance();
90-
91-
return () -> {
92-
// Mark all task instances as invalid before creating the new graph
93-
final List<TaskInstance> taskInstances = getValidTaskInstance(workflowInstance);
94-
taskInstanceDao.markTaskInstanceInvalid(taskInstances);
95-
return parentAssembler.assemble();
96-
};
97-
}
98-
9976
@Override
10077
public CommandType commandType() {
10178
return CommandType.REPEAT_RUNNING;

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java

Lines changed: 0 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,14 @@
2020
import org.apache.dolphinscheduler.common.enums.CommandType;
2121
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2222
import org.apache.dolphinscheduler.dao.entity.Command;
23-
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2423
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
25-
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
2624
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
27-
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
2825
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
29-
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler;
30-
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
31-
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
32-
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
33-
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
34-
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
35-
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories;
3626
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
3727

38-
import java.util.ArrayList;
39-
import java.util.HashSet;
40-
import java.util.List;
41-
import java.util.Map;
42-
import java.util.Set;
43-
import java.util.function.BiConsumer;
44-
import java.util.function.Function;
45-
import java.util.stream.Collectors;
46-
4728
import org.springframework.beans.factory.annotation.Autowired;
48-
import org.springframework.context.ApplicationContext;
4929
import org.springframework.stereotype.Component;
5030

51-
import com.google.common.collect.Lists;
52-
5331
/**
5432
* This handler used to handle {@link CommandType#START_FAILURE_TASK_PROCESS}.
5533
* <p> Will start the failure/pause/killed and other task instance which is behind success tasks instance but not been triggered.
@@ -60,15 +38,6 @@ public class RecoverFailureTaskCommandHandler extends AbstractCommandHandler {
6038
@Autowired
6139
private WorkflowInstanceDao workflowInstanceDao;
6240

63-
@Autowired
64-
private TaskInstanceDao taskInstanceDao;
65-
66-
@Autowired
67-
private ApplicationContext applicationContext;
68-
69-
@Autowired
70-
private TaskInstanceFactories taskInstanceFactories;
71-
7241
@Autowired
7342
private MasterConfig masterConfig;
7443

@@ -101,146 +70,6 @@ protected void assembleWorkflowInstance(
10170
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
10271
}
10372

104-
@Override
105-
protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler(
106-
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
107-
// Capture the context needed for deferred graph assembly
108-
final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph();
109-
final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance();
110-
final List<String> startNodes = parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder);
111-
112-
return () -> {
113-
final Map<String, TaskInstance> taskInstanceMap = dealWithHistoryTaskInstances(
114-
workflowExecuteContextBuilder)
115-
.stream()
116-
.collect(Collectors.toMap(TaskInstance::getName, Function.identity()));
117-
118-
final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph();
119-
120-
final BiConsumer<String, Set<String>> taskExecutionRunnableCreator = (task, successors) -> {
121-
final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder =
122-
TaskExecutionRunnableBuilder
123-
.builder()
124-
.workflowExecutionGraph(workflowExecutionGraph)
125-
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
126-
.project(workflowExecuteContextBuilder.getProject())
127-
.workflowInstance(workflowInstance)
128-
.taskDefinition(workflowGraph.getTaskNodeByName(task))
129-
.taskInstance(taskInstanceMap.get(task))
130-
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
131-
.applicationContext(applicationContext)
132-
.build();
133-
workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder));
134-
workflowExecutionGraph.addEdge(task, successors);
135-
};
136-
137-
final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor =
138-
WorkflowGraphTopologyLogicalVisitor.builder()
139-
.taskDependType(workflowInstance.getTaskDependType())
140-
.onWorkflowGraph(workflowGraph)
141-
.fromTask(startNodes)
142-
.doVisitFunction(taskExecutionRunnableCreator)
143-
.build();
144-
workflowGraphTopologyLogicalVisitor.visit();
145-
workflowExecutionGraph.removeUnReachableEdge();
146-
147-
return workflowExecutionGraph;
148-
};
149-
}
150-
151-
/**
152-
* Return the valid task instance which should not be recovered.
153-
* <p> Will mark the failure/killed task instance as invalid.
154-
*/
155-
private List<TaskInstance> dealWithHistoryTaskInstances(
156-
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
157-
final WorkflowInstance workflowInstance = workflowExecuteContextBuilder.getWorkflowInstance();
158-
final Map<String, TaskInstance> taskInstanceMap = super.getValidTaskInstance(workflowInstance)
159-
.stream()
160-
.collect(Collectors.toMap(TaskInstance::getName, Function.identity()));
161-
162-
final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph();
163-
164-
final Set<String> needRecoverTasks = new HashSet<>();
165-
final Set<String> markInvalidTasks = new HashSet<>();
166-
final BiConsumer<String, Set<String>> historyTaskInstanceMarker = (task, successors) -> {
167-
// If the parent is need recover
168-
// Then the task should mark as invalid, and it's child should be mark as invalidated.
169-
if (markInvalidTasks.contains(task)) {
170-
if (taskInstanceMap.containsKey(task)) {
171-
taskInstanceDao.markTaskInstanceInvalid(Lists.newArrayList(taskInstanceMap.get(task)));
172-
taskInstanceMap.remove(task);
173-
}
174-
markInvalidTasks.addAll(successors);
175-
return;
176-
}
177-
178-
final TaskInstance taskInstance = taskInstanceMap.get(task);
179-
if (taskInstance == null) {
180-
return;
181-
}
182-
183-
if (isTaskNeedRecreate(taskInstance) || isTaskCanRecover(taskInstance)) {
184-
needRecoverTasks.add(task);
185-
markInvalidTasks.addAll(successors);
186-
}
187-
};
188-
189-
final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor =
190-
WorkflowGraphTopologyLogicalVisitor.builder()
191-
.onWorkflowGraph(workflowGraph)
192-
.taskDependType(workflowInstance.getTaskDependType())
193-
.fromTask(parseStartNodesFromWorkflowInstance(workflowExecuteContextBuilder))
194-
.doVisitFunction(historyTaskInstanceMarker)
195-
.build();
196-
workflowGraphTopologyLogicalVisitor.visit();
197-
198-
for (String task : needRecoverTasks) {
199-
final TaskInstance taskInstance = taskInstanceMap.get(task);
200-
if (isTaskCanRecover(taskInstance)) {
201-
taskInstanceMap.put(task, createRecoverTaskInstance(taskInstance));
202-
continue;
203-
}
204-
if (isTaskNeedRecreate(taskInstance)) {
205-
taskInstanceMap.put(task, createRecreatedTaskInstance(taskInstance));
206-
}
207-
}
208-
return new ArrayList<>(taskInstanceMap.values());
209-
}
210-
211-
/**
212-
* Whether the task need to be recreated.
213-
* <p> If the task state is FAILURE and KILL, then will mark the task invalid and recreate the task.
214-
*/
215-
private boolean isTaskNeedRecreate(final TaskInstance taskInstance) {
216-
if (taskInstance == null) {
217-
return false;
218-
}
219-
return taskInstance.getState() == TaskExecutionStatus.FAILURE
220-
|| taskInstance.getState() == TaskExecutionStatus.KILL;
221-
}
222-
223-
private TaskInstance createRecreatedTaskInstance(final TaskInstance taskInstance) {
224-
return taskInstanceFactories.failedRecoverTaskInstanceFactory()
225-
.builder()
226-
.withTaskInstance(taskInstance)
227-
.build();
228-
}
229-
230-
private boolean isTaskCanRecover(final TaskInstance taskInstance) {
231-
if (taskInstance == null) {
232-
return false;
233-
}
234-
return taskInstance.getState() == TaskExecutionStatus.PAUSE;
235-
}
236-
237-
private TaskInstance createRecoverTaskInstance(final TaskInstance taskInstance) {
238-
return taskInstanceFactories.pauseRecoverTaskInstanceFactory()
239-
.builder()
240-
.withTaskInstance(taskInstance)
241-
.build();
242-
}
243-
24473
@Override
24574
public CommandType commandType() {
24675
return CommandType.START_FAILURE_TASK_PROCESS;

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2424
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
2525
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
26-
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraphAssembler;
2726
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext;
2827

2928
import org.springframework.beans.factory.annotation.Autowired;
@@ -50,13 +49,6 @@ protected void assembleWorkflowInstance(WorkflowExecuteContext.WorkflowExecuteCo
5049
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
5150
}
5251

53-
@Override
54-
protected IWorkflowExecutionGraphAssembler createWorkflowExecutionGraphAssembler(
55-
WorkflowExecuteContext.WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
56-
// No execution graph needed for serial wait recovery
57-
return null;
58-
}
59-
6052
@Override
6153
public CommandType commandType() {
6254
return CommandType.RECOVER_SERIAL_WAIT;

0 commit comments

Comments
 (0)