Skip to content

Commit 42f3356

Browse files
committed
Add MasterCoordinator
1 parent 271fa23 commit 42f3356

File tree

8 files changed

+147
-32
lines changed

8 files changed

+147
-32
lines changed

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskGroupQueueStatus.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public enum TaskGroupQueueStatus {
2828

2929
WAIT_QUEUE(-1, "wait queue"),
3030
ACQUIRE_SUCCESS(1, "acquire success"),
31+
@Deprecated
3132
RELEASE(2, "release");
3233

3334
@EnumValue

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@
2424
@NoArgsConstructor
2525
public class MasterHeartBeat extends BaseHeartBeat implements HeartBeat {
2626

27+
private boolean isCoordinator;
28+
2729
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
3434
import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
3535
import org.apache.dolphinscheduler.server.master.cluster.ClusterStateMonitors;
36+
import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
3637
import org.apache.dolphinscheduler.server.master.engine.WorkflowEngine;
3738
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
3839
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBusFireWorker;
@@ -95,6 +96,9 @@ public class MasterServer implements IStoppable {
9596
@Autowired
9697
private SystemEventBusFireWorker systemEventBusFireWorker;
9798

99+
@Autowired
100+
private MasterCoordinator masterCoordinator;
101+
98102
public static void main(String[] args) {
99103
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
100104

@@ -122,6 +126,8 @@ public void initialized() {
122126
this.masterRegistryClient.start();
123127
this.masterRegistryClient.setRegistryStoppable(this);
124128

129+
this.masterCoordinator.start();
130+
125131
this.clusterManager.start();
126132
this.clusterStateMonitors.start();
127133

@@ -173,6 +179,7 @@ public void close(String cause) {
173179
WorkflowEngine workflowEngine1 = workflowEngine;
174180
SchedulerApi closedSchedulerApi = schedulerApi;
175181
MasterRpcServer closedRpcServer = masterRPCServer;
182+
MasterCoordinator closeMasterCoordinator = masterCoordinator;
176183
MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
177184
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
178185
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine;
19+
20+
import org.apache.dolphinscheduler.registry.api.Registry;
21+
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
22+
import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
23+
import org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
24+
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
25+
26+
import lombok.extern.slf4j.Slf4j;
27+
28+
import org.springframework.beans.factory.annotation.Autowired;
29+
import org.springframework.stereotype.Component;
30+
31+
/**
32+
* The MasterCoordinator is singleton at the clusters, which is used to do some control work, e.g manage the {@link TaskGroupCoordinator}
33+
*/
34+
@Slf4j
35+
@Component
36+
public class MasterCoordinator extends AbstractHAServer {
37+
38+
@Autowired
39+
private TaskGroupCoordinator taskGroupCoordinator;
40+
41+
public MasterCoordinator(final Registry registry, final MasterConfig masterConfig) {
42+
super(
43+
registry,
44+
RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
45+
masterConfig.getMasterAddress());
46+
47+
addServerStatusChangeListener(new AbstractServerStatusChangeListener() {
48+
49+
@Override
50+
public void changeToActive() {
51+
onActive();
52+
}
53+
54+
@Override
55+
public void changeToStandBy() {
56+
onStandBy();
57+
}
58+
});
59+
}
60+
61+
@Override
62+
public void start() {
63+
super.start();
64+
log.info("MasterCoordinator started...");
65+
}
66+
67+
@Override
68+
public void close() {
69+
taskGroupCoordinator.close();
70+
log.info("MasterCoordinator shutdown...");
71+
}
72+
73+
private void onActive() {
74+
taskGroupCoordinator.start();
75+
}
76+
77+
private void onStandBy() {
78+
taskGroupCoordinator.close();
79+
}
80+
81+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import org.apache.dolphinscheduler.extract.master.transportor.TaskGroupSlotAcquireSuccessNotifyResponse;
3838
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
3939
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
40-
import org.apache.dolphinscheduler.registry.api.RegistryClient;
41-
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
4240
import org.apache.dolphinscheduler.server.master.utils.TaskGroupUtils;
4341

4442
import org.apache.commons.collections4.CollectionUtils;
@@ -47,6 +45,7 @@
4745
import java.util.Date;
4846
import java.util.List;
4947
import java.util.Map;
48+
import java.util.concurrent.TimeUnit;
5049
import java.util.function.Function;
5150
import java.util.stream.Collectors;
5251

@@ -79,10 +78,7 @@
7978
*/
8079
@Slf4j
8180
@Component
82-
public class TaskGroupCoordinator extends BaseDaemonThread implements AutoCloseable {
83-
84-
@Autowired
85-
private RegistryClient registryClient;
81+
public class TaskGroupCoordinator implements AutoCloseable {
8682

8783
@Autowired
8884
private TaskGroupDao taskGroupDao;
@@ -98,38 +94,42 @@ public class TaskGroupCoordinator extends BaseDaemonThread implements AutoClosea
9894

9995
private boolean flag = true;
10096

101-
private static final int DEFAULT_LIMIT = 1000;
97+
private Thread internalThread;
10298

103-
public TaskGroupCoordinator() {
104-
super("TaskGroupCoordinator");
105-
}
99+
private static final int DEFAULT_LIMIT = 1000;
106100

107-
@Override
108101
public synchronized void start() {
109102
log.info("TaskGroupCoordinator starting...");
103+
if (flag) {
104+
throw new IllegalStateException("TaskGroupCoordinator is already started");
105+
}
106+
if (internalThread != null) {
107+
throw new IllegalStateException("InternalThread is already started");
108+
}
110109
flag = true;
111-
super.start();
110+
internalThread = new BaseDaemonThread(this::doStart) {
111+
};
112+
internalThread.start();
112113
log.info("TaskGroupCoordinator started...");
113114
}
114115

115-
@Override
116-
public void run() {
116+
private void doStart() {
117+
// Sleep 1 minutes here to make sure the previous task group slot has been released.
118+
// This step is not necessary, since the wakeup operation is idempotent, but we can avoid confusion warning.
119+
ThreadUtils.sleep(TimeUnit.MINUTES.toMillis(1));
120+
117121
while (flag) {
118122
try {
119-
registryClient.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
120-
try {
121-
StopWatch taskGroupCoordinatorRoundCost = StopWatch.createStarted();
123+
final StopWatch taskGroupCoordinatorRoundCost = StopWatch.createStarted();
122124

123-
amendTaskGroupUseSize();
124-
amendTaskGroupQueueStatus();
125-
dealWithForceStartTaskGroupQueue();
126-
dealWithWaitingTaskGroupQueue();
125+
//
126+
amendTaskGroupUseSize();
127+
amendTaskGroupQueueStatus();
128+
dealWithForceStartTaskGroupQueue();
129+
dealWithWaitingTaskGroupQueue();
127130

128-
taskGroupCoordinatorRoundCost.stop();
129-
log.debug("TaskGroupCoordinator round cost: {}/ms", taskGroupCoordinatorRoundCost.getTime());
130-
} finally {
131-
registryClient.releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
132-
}
131+
taskGroupCoordinatorRoundCost.stop();
132+
log.debug("TaskGroupCoordinator round cost: {}/ms", taskGroupCoordinatorRoundCost.getTime());
133133
} catch (Throwable e) {
134134
log.error("TaskGroupCoordinator error", e);
135135
} finally {
@@ -212,7 +212,6 @@ private void amendTaskGroupQueueStatus(List<TaskGroupQueue> taskGroupQueues) {
212212
log.warn("The TaskInstance: {} state: {} finished, will release the TaskGroupQueue: {}",
213213
taskInstance.getName(), taskInstance.getState(), taskGroupQueue);
214214
deleteTaskGroupQueueSlot(taskGroupQueue);
215-
continue;
216215
}
217216
}
218217
}
@@ -226,7 +225,7 @@ private void dealWithForceStartTaskGroupQueue() {
226225
int limit = DEFAULT_LIMIT;
227226
StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted();
228227
while (true) {
229-
List<TaskGroupQueue> taskGroupQueues =
228+
final List<TaskGroupQueue> taskGroupQueues =
230229
taskGroupQueueDao.queryWaitNotifyForceStartTaskGroupQueue(minTaskGroupQueueId, limit);
231230
if (CollectionUtils.isEmpty(taskGroupQueues)) {
232231
break;
@@ -245,7 +244,7 @@ private void dealWithForceStartTaskGroupQueue(List<TaskGroupQueue> taskGroupQueu
245244
// Find the force start task group queue(Which is inQueue and forceStart is YES)
246245
// Notify the related waiting task instance
247246
// Set the taskGroupQueue status to RELEASE and remove it from queue
248-
for (TaskGroupQueue taskGroupQueue : taskGroupQueues) {
247+
for (final TaskGroupQueue taskGroupQueue : taskGroupQueues) {
249248
try {
250249
LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId());
251250
// notify the waiting task instance
@@ -488,8 +487,21 @@ private void deleteTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
488487
}
489488

490489
@Override
491-
public void close() throws Exception {
490+
public synchronized void close() {
491+
if (!flag) {
492+
log.error("TaskGroupCoordinator is already closed");
493+
return;
494+
}
492495
flag = false;
496+
try {
497+
if (internalThread != null) {
498+
internalThread.interrupt();
499+
internalThread.join();
500+
}
501+
} catch (Exception ex) {
502+
log.error("Close internalThread failed", ex);
503+
}
504+
internalThread = null;
493505
log.info("TaskGroupCoordinator closed");
494506
}
495507
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
3131
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
3232
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
33+
import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
3334
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
3435

3536
import lombok.NonNull;
@@ -44,17 +45,21 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
4445

4546
private final RegistryClient registryClient;
4647

48+
private final MasterCoordinator masterCoordinator;
49+
4750
private final String heartBeatPath;
4851

4952
private final int processId;
5053

5154
public MasterHeartBeatTask(@NonNull MasterConfig masterConfig,
5255
@NonNull MetricsProvider metricsProvider,
53-
@NonNull RegistryClient registryClient) {
56+
@NonNull RegistryClient registryClient,
57+
@NonNull MasterCoordinator masterCoordinator) {
5458
super("MasterHeartBeatTask", masterConfig.getMaxHeartbeatInterval().toMillis());
5559
this.masterConfig = masterConfig;
5660
this.metricsProvider = metricsProvider;
5761
this.registryClient = registryClient;
62+
this.masterCoordinator = masterCoordinator;
5863
this.heartBeatPath = masterConfig.getMasterRegistryPath();
5964
this.processId = OSUtils.getProcessID();
6065
}
@@ -75,6 +80,7 @@ public MasterHeartBeat getHeartBeat() {
7580
.serverStatus(serverStatus)
7681
.host(NetUtils.getHost())
7782
.port(masterConfig.getListenPort())
83+
.isCoordinator(masterCoordinator.isActive())
7884
.build();
7985
}
8086

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.dolphinscheduler.registry.api.RegistryException;
3131
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
3232
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
33+
import org.apache.dolphinscheduler.server.master.engine.MasterCoordinator;
3334

3435
import lombok.extern.slf4j.Slf4j;
3536

@@ -53,11 +54,15 @@ public class MasterRegistryClient implements AutoCloseable {
5354
@Autowired
5455
private MetricsProvider metricsProvider;
5556

57+
@Autowired
58+
private MasterCoordinator masterCoordinator;
59+
5660
private MasterHeartBeatTask masterHeartBeatTask;
5761

5862
public void start() {
5963
try {
60-
this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, metricsProvider, registryClient);
64+
this.masterHeartBeatTask =
65+
new MasterHeartBeatTask(masterConfig, metricsProvider, registryClient, masterCoordinator);
6166
// master registry
6267
registry();
6368
registryClient.addConnectionStateListener(new MasterConnectionStateListener(registryClient));

dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public enum RegistryNodeType {
2828

2929
MASTER("Master", "/nodes/master"),
3030
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
31+
MASTER_COORDINATOR("MasterCoordinator", "/nodes/master-coordinator"),
3132
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"),
3233
MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator", "/lock/master-serial-workflow-coordinator"),
3334

0 commit comments

Comments
 (0)