Skip to content

Commit 8292ea0

Browse files
author
苏义超
committed
add do with initializeFailTaskIds
1 parent 91c1b57 commit 8292ea0

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,16 @@ public class WorkerGroupDispatcher extends BaseDaemonThread {
4646

4747
private final Set<Integer> waitingDispatchTaskIds;
4848

49+
private final Set<Integer> initializeFailTaskIds;
50+
4951
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
5052

5153
public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
5254
super("WorkerGroupTaskDispatcher-" + workerGroupName);
5355
this.taskExecutorClient = taskExecutorClient;
5456
this.workerGroupEventBus = new TaskDispatchableEventBus<>();
5557
this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
58+
this.initializeFailTaskIds = ConcurrentHashMap.newKeySet();
5659
log.info("Initialize WorkerGroupDispatcher: {}", this.getName());
5760
}
5861

@@ -139,4 +142,12 @@ public synchronized void close() {
139142
int queueSize() {
140143
return this.workerGroupEventBus.size();
141144
}
145+
146+
public void addInitializeFailTask(ITaskExecutionRunnable taskExecutionRunnable) {
147+
initializeFailTaskIds.add(taskExecutionRunnable.getId());
148+
}
149+
150+
public boolean removeInitializeFailTask(ITaskExecutionRunnable taskExecutionRunnable) {
151+
return initializeFailTaskIds.remove(taskExecutionRunnable.getId());
152+
}
142153
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,15 @@ public void dispatchTask(final ITaskExecutionRunnable taskExecutionRunnable,
6666
*/
6767
public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
6868
final String workerGroup = taskExecutionRunnable.getTaskInstance().getWorkerGroup();
69-
boolean removed = getOrCreateWorkerGroupDispatcher(workerGroup).removeTask(taskExecutionRunnable);
69+
70+
boolean removed = getOrCreateWorkerGroupDispatcher(workerGroup).removeInitializeFailTask(taskExecutionRunnable);
71+
if (removed) {
72+
log.info("Success removed Initialize Fail Task[id={}] from WorkerGroupDispatcher[name={}]",
73+
taskExecutionRunnable.getId(), workerGroup);
74+
return removed;
75+
}
76+
77+
removed = getOrCreateWorkerGroupDispatcher(workerGroup).removeTask(taskExecutionRunnable);
7078
if (removed) {
7179
log.info("Success removed Task[id={}] from WorkerGroupDispatcher[name={}]",
7280
taskExecutionRunnable.getId(), workerGroup);
@@ -97,6 +105,13 @@ public void close() throws Exception {
97105
log.info("WorkerGroupDispatcherCoordinator closed...");
98106
}
99107

108+
public void addInitializeFailTask(ITaskExecutionRunnable taskExecutionRunnable) {
109+
final String workerGroup = taskExecutionRunnable.getTaskInstance().getWorkerGroup();
110+
getOrCreateWorkerGroupDispatcher(workerGroup).addInitializeFailTask(taskExecutionRunnable);
111+
log.info("add Initialize Fail Task[id={}] to WorkerGroupDispatcher[name={}]", taskExecutionRunnable.getId(),
112+
workerGroup);
113+
}
114+
100115
private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String workerGroup) {
101116
return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
102117
WorkerGroupDispatcher workerGroupDispatcher = new WorkerGroupDispatcher(wg, taskExecutorClient);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskSubmittedStateAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,14 @@ public void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRu
109109
taskInstance.getDelayTime(),
110110
remainTimeMills);
111111
}
112-
taskExecutionRunnable.initializeTaskExecutionContext();
112+
113+
try {
114+
taskExecutionRunnable.initializeTaskExecutionContext();
115+
} catch (Exception ex) {
116+
log.error("Current taskInstance: {} initializeTaskExecutionContext error", taskInstance.getName(), ex);
117+
workerGroupDispatcherCoordinator.addInitializeFailTask(taskExecutionRunnable);
118+
return;
119+
}
113120
workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable, remainTimeMills);
114121
}
115122

0 commit comments

Comments
 (0)