Skip to content

Commit b6b0461

Browse files
ruanwenjunSbloodyS
andauthored
[Fix-16942] Fix gloval master failover might cause master dead (#16953)
Co-authored-by: xiangzihao <460888207@qq.com>
1 parent 890376a commit b6b0461

File tree

9 files changed

+78
-48
lines changed

9 files changed

+78
-48
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
@SuperBuilder
2929
public abstract class BaseServerMetadata implements IClusters.IServerMetadata {
3030

31+
private final int processId;
32+
3133
// The server startup time in milliseconds.
3234
private final long serverStartupTime;
3335

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class MasterServerMetadata extends BaseServerMetadata implements Comparab
3333

3434
public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) {
3535
return MasterServerMetadata.builder()
36+
.processId(masterHeartBeat.getProcessId())
3637
.serverStartupTime(masterHeartBeat.getStartupTime())
3738
.address(masterHeartBeat.getHost() + Constants.COLON + masterHeartBeat.getPort())
3839
.cpuUsage(masterHeartBeat.getCpuUsage())

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class WorkerServerMetadata extends BaseServerMetadata {
4141

4242
public static WorkerServerMetadata parseFromHeartBeat(final WorkerHeartBeat workerHeartBeat) {
4343
return WorkerServerMetadata.builder()
44+
.processId(workerHeartBeat.getProcessId())
4445
.serverStartupTime(workerHeartBeat.getStartupTime())
4546
.address(workerHeartBeat.getHost() + Constants.COLON + workerHeartBeat.getPort())
4647
.workerGroup(workerHeartBeat.getWorkerGroup())

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
public class MasterFailoverEvent extends AbstractSystemEvent {
3030

3131
private final MasterServerMetadata masterServerMetadata;
32+
// The time when the event occurred. This might be different at different nodes.
3233
private final Date eventTime;
3334

3435
private MasterFailoverEvent(final MasterServerMetadata masterServerMetadata,

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444

4545
import org.springframework.beans.factory.annotation.Autowired;
4646
import org.springframework.stereotype.Component;
47-
import org.springframework.transaction.PlatformTransactionManager;
4847

4948
@Slf4j
5049
@Component
@@ -65,9 +64,6 @@ public class FailoverCoordinator implements IFailoverCoordinator {
6564
@Autowired
6665
private WorkflowInstanceDao workflowInstanceDao;
6766

68-
@Autowired
69-
private PlatformTransactionManager platformTransactionManager;
70-
7167
@Autowired
7268
private WorkflowFailover workflowFailover;
7369

@@ -81,13 +77,21 @@ public void globalMasterFailover(final GlobalMasterFailoverEvent globalMasterFai
8177
final Optional<MasterServerMetadata> aliveMasterOptional =
8278
clusterManager.getMasterClusters().getServer(masterAddress);
8379
if (aliveMasterOptional.isPresent()) {
80+
// If the master is alive, then we use the alive master's startup time as the failover deadline.
8481
final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
8582
log.info("The master[{}] is alive, do global master failover on it", aliveMasterServerMetadata);
86-
doMasterFailover(aliveMasterServerMetadata.getAddress(),
87-
aliveMasterServerMetadata.getServerStartupTime());
83+
doMasterFailover(
84+
masterAddress,
85+
aliveMasterServerMetadata.getServerStartupTime(),
86+
RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(
87+
masterAddress));
8888
} else {
89+
// If the master is not alive, then we use the event time as the failover deadline.
8990
log.info("The master[{}] is not alive, do global master failover on it", masterAddress);
90-
doMasterFailover(masterAddress, globalMasterFailoverEvent.getEventTime().getTime());
91+
doMasterFailover(
92+
masterAddress,
93+
globalMasterFailoverEvent.getEventTime().getTime(),
94+
RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(masterAddress));
9195
}
9296
}
9397

@@ -99,53 +103,55 @@ public void globalMasterFailover(final GlobalMasterFailoverEvent globalMasterFai
99103
public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) {
100104
final MasterServerMetadata masterServerMetadata = masterFailoverEvent.getMasterServerMetadata();
101105
log.info("Master[{}] failover starting", masterServerMetadata);
106+
final String masterAddress = masterServerMetadata.getAddress();
102107

103108
final Optional<MasterServerMetadata> aliveMasterOptional =
104-
clusterManager.getMasterClusters().getServer(masterServerMetadata.getAddress());
109+
clusterManager.getMasterClusters().getServer(masterAddress);
105110
if (aliveMasterOptional.isPresent()) {
106111
final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
107112
if (aliveMasterServerMetadata.getServerStartupTime() == masterServerMetadata.getServerStartupTime()) {
108113
log.info("The master[{}] is alive, maybe it reconnect to registry skip failover", masterServerMetadata);
109-
} else {
110-
log.info("The master[{}] is alive, but the startup time is different, will failover on {}",
111-
masterServerMetadata,
112-
aliveMasterServerMetadata);
113-
doMasterFailover(aliveMasterServerMetadata.getAddress(),
114-
aliveMasterServerMetadata.getServerStartupTime());
114+
return;
115115
}
116-
} else {
117-
log.info("The master[{}] is not alive, will failover", masterServerMetadata);
118-
doMasterFailover(masterServerMetadata.getAddress(), masterServerMetadata.getServerStartupTime());
119116
}
117+
doMasterFailover(
118+
masterServerMetadata.getAddress(),
119+
masterFailoverEvent.getEventTime().getTime(),
120+
RegistryUtils.getFailoveredNodePath(
121+
masterServerMetadata.getAddress(),
122+
masterServerMetadata.getServerStartupTime(),
123+
masterServerMetadata.getProcessId()));
120124
}
121125

122126
/**
123127
* Do master failover.
124128
* <p> Will failover the workflow which is scheduled by the master and the workflow's fire time is before the maxWorkflowFireTime.
125129
*/
126-
private void doMasterFailover(final String masterAddress, final long masterStartupTime) {
130+
private void doMasterFailover(final String masterAddress,
131+
final long workflowFailoverDeadline,
132+
final String masterFailoverNodePath) {
127133
// We use lock to avoid multiple master failover at the same time.
128134
// Once the workflow has been failovered, then it's state will be changed to FAILOVER
129135
// Once the FAILOVER workflow has been refired, then it's host will be changed to the new master and have a new
130136
// start time.
131137
// So if a master has been failovered multiple times, there is no problem.
132138
final StopWatch failoverTimeCost = StopWatch.createStarted();
133-
registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath());
139+
registryClient.getLock(RegistryUtils.getMasterFailoverLockPath(masterAddress));
134140
try {
135-
final String failoverFinishedNodePath =
136-
RegistryUtils.getFailoverFinishedNodePath(masterAddress, masterStartupTime);
137-
if (registryClient.exists(failoverFinishedNodePath)) {
138-
log.error("The master[{}-{}] is exist at: {}, means it has already been failovered, skip failover",
141+
// If the master has already been failovered, then we skip the failover.
142+
if (registryClient.exists(masterFailoverNodePath)
143+
&& String.valueOf(workflowFailoverDeadline).equals(registryClient.get(masterFailoverNodePath))) {
144+
log.error("The master[{}/{}] is exist at: {}, means it has already been failovered, skip failover",
139145
masterAddress,
140-
masterStartupTime,
141-
failoverFinishedNodePath);
146+
workflowFailoverDeadline,
147+
masterFailoverNodePath);
142148
return;
143149
}
144150
final List<WorkflowInstance> needFailoverWorkflows =
145-
getFailoverWorkflowsForMaster(masterAddress, new Date(masterStartupTime));
151+
getFailoverWorkflowsForMaster(masterAddress, new Date(workflowFailoverDeadline));
146152
needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow);
153+
registryClient.persist(masterFailoverNodePath, String.valueOf(workflowFailoverDeadline));
147154
failoverTimeCost.stop();
148-
registryClient.persist(failoverFinishedNodePath, String.valueOf(System.currentTimeMillis()));
149155
log.info("Master[{}] failover {} workflows finished, cost: {}/ms",
150156
masterAddress,
151157
needFailoverWorkflows.size(),
@@ -190,28 +196,30 @@ public void failoverWorker(final WorkerFailoverEvent workerFailoverEvent) {
190196
final WorkerServerMetadata aliveWorkerServerMetadata = aliveWorkerOptional.get();
191197
if (aliveWorkerServerMetadata.getServerStartupTime() == workerServerMetadata.getServerStartupTime()) {
192198
log.info("The worker[{}] is alive, maybe it reconnect to registry skip failover", workerServerMetadata);
193-
} else {
194-
log.info("The worker[{}] is alive, but the startup time is different, will failover on {}",
195-
workerServerMetadata,
196-
aliveWorkerServerMetadata);
197-
doWorkerFailover(aliveWorkerServerMetadata.getAddress(),
198-
aliveWorkerServerMetadata.getServerStartupTime());
199+
return;
199200
}
200-
} else {
201-
log.info("The worker[{}] is not alive, will failover", workerServerMetadata);
202-
doWorkerFailover(workerServerMetadata.getAddress(), workerServerMetadata.getServerStartupTime());
203201
}
202+
doWorkerFailover(
203+
workerServerMetadata.getAddress(),
204+
System.currentTimeMillis(),
205+
RegistryUtils.getFailoveredNodePath(
206+
workerServerMetadata.getAddress(),
207+
workerServerMetadata.getServerStartupTime(),
208+
workerServerMetadata.getProcessId()));
204209
}
205210

206-
private void doWorkerFailover(final String workerAddress, final long workerCrashTime) {
211+
private void doWorkerFailover(final String workerAddress,
212+
final long taskFailoverDeadline,
213+
final String workerFailoverNodePath) {
207214
final StopWatch failoverTimeCost = StopWatch.createStarted();
215+
// we don't check the workerFailoverNodePath exist, since the worker may be failovered multiple master
208216

209217
final List<ITaskExecutionRunnable> needFailoverTasks =
210-
getFailoverTaskForWorker(workerAddress, new Date(workerCrashTime));
218+
getFailoverTaskForWorker(workerAddress, new Date(taskFailoverDeadline));
211219
needFailoverTasks.forEach(taskFailover::failoverTask);
212220

213221
registryClient.persist(
214-
RegistryUtils.getFailoverFinishedNodePath(workerAddress, workerCrashTime),
222+
workerFailoverNodePath,
215223
String.valueOf(System.currentTimeMillis()));
216224
failoverTimeCost.stop();
217225
log.info("Worker[{}] failover {} tasks finished, cost: {}/ms",
@@ -221,7 +229,7 @@ private void doWorkerFailover(final String workerAddress, final long workerCrash
221229
}
222230

223231
private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String workerAddress,
224-
final Date workerCrashTime) {
232+
final Date taskFailoverDeadline) {
225233
return workflowRepository.getAll()
226234
.stream()
227235
.map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph)
@@ -237,7 +245,7 @@ private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String worke
237245
// The submitTime should not be null.
238246
// This is a bad case unless someone manually set the submitTime to null.
239247
final Date submitTime = taskExecutionRunnable.getTaskInstance().getSubmitTime();
240-
return submitTime != null && submitTime.before(workerCrashTime);
248+
return submitTime != null && submitTime.before(taskFailoverDeadline);
241249
})
242250
.collect(Collectors.toList());
243251
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public MasterHeartBeat getHeartBeat() {
8686

8787
@Override
8888
public void writeHeartBeat(final MasterHeartBeat masterHeartBeat) {
89-
final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(masterHeartBeat);
89+
final String failoverNodePath = RegistryUtils.getFailoveredNodePath(masterHeartBeat);
9090
if (registryClient.exists(failoverNodePath)) {
9191
log.warn("The master: {} is under {}, means it has been failover will close myself",
9292
masterHeartBeat,

dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public enum RegistryNodeType {
2626

2727
FAILOVER_FINISH_NODES("FailoverFinishNodes", "/nodes/failover-finish-nodes"),
2828

29+
GLOBAL_MASTER_FAILOVER_LOCK("GlobalMasterFailoverLock", "/lock/global-master-failover"),
2930
MASTER("Master", "/nodes/master"),
3031
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
3132
MASTER_COORDINATOR("MasterCoordinator", "/nodes/master-coordinator"),

dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,30 @@
2020
import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
2121
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
2222

23+
import com.google.common.base.Preconditions;
24+
2325
public class RegistryUtils {
2426

25-
public static String getFailoverFinishedNodePath(final BaseHeartBeat baseHeartBeat) {
26-
return getFailoverFinishedNodePath(baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
27-
baseHeartBeat.getStartupTime());
27+
public static String getMasterFailoverLockPath(final String masterAddress) {
28+
Preconditions.checkNotNull(masterAddress, "master address cannot be null");
29+
return RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath() + "/" + masterAddress;
30+
}
31+
32+
public static String getFailoveredNodePathWhichStartupTimeIsUnknown(final String serverAddress) {
33+
return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + serverAddress + "-" + "unknown" + "-"
34+
+ "unknown";
35+
}
36+
37+
public static String getFailoveredNodePath(final BaseHeartBeat baseHeartBeat) {
38+
return getFailoveredNodePath(
39+
baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
40+
baseHeartBeat.getStartupTime(),
41+
baseHeartBeat.getProcessId());
2842
}
2943

30-
public static String getFailoverFinishedNodePath(final String masterAddress, final long masterStartupTime) {
31-
return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + masterAddress + "-" + masterStartupTime;
44+
public static String getFailoveredNodePath(final String serverAddress, final long serverStartupTime,
45+
final int processId) {
46+
return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + serverAddress + "-" + serverStartupTime
47+
+ "-" + processId;
3248
}
3349
}

dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public WorkerHeartBeat getHeartBeat() {
8585

8686
@Override
8787
public void writeHeartBeat(final WorkerHeartBeat workerHeartBeat) {
88-
final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(workerHeartBeat);
88+
final String failoverNodePath = RegistryUtils.getFailoveredNodePath(workerHeartBeat);
8989
if (registryClient.exists(failoverNodePath)) {
9090
log.warn("The worker: {} is under {}, means it has been failover will close myself",
9191
workerHeartBeat,

0 commit comments

Comments
 (0)