|
20 | 20 | import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS; |
21 | 21 |
|
22 | 22 | import org.apache.dolphinscheduler.common.IStoppable; |
23 | | -import org.apache.dolphinscheduler.common.constants.Constants; |
24 | | -import org.apache.dolphinscheduler.common.enums.ServerStatus; |
25 | 23 | import org.apache.dolphinscheduler.common.model.Server; |
26 | | -import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; |
27 | 24 | import org.apache.dolphinscheduler.common.thread.ThreadUtils; |
28 | 25 | import org.apache.dolphinscheduler.common.utils.JSONUtils; |
29 | 26 | import org.apache.dolphinscheduler.extract.base.utils.Host; |
@@ -84,27 +81,18 @@ public void start() { |
84 | 81 | } |
85 | 82 | } |
86 | 83 |
|
87 | | - private void registry() throws InterruptedException { |
88 | | - WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat(); |
89 | | - while (ServerStatus.BUSY.equals(workerHeartBeat.getServerStatus())) { |
90 | | - log.warn("Worker node is BUSY: {}", workerHeartBeat); |
91 | | - workerHeartBeat = workerHeartBeatTask.getHeartBeat(); |
92 | | - Thread.sleep(SLEEP_TIME_MILLIS); |
93 | | - } |
| 84 | + private void registry() { |
94 | 85 | String workerRegistryPath = workerConfig.getWorkerRegistryPath(); |
95 | 86 | // remove before persist |
96 | 87 | registryClient.remove(workerRegistryPath); |
97 | | - registryClient.persistEphemeral(workerRegistryPath, JSONUtils.toJsonString(workerHeartBeat)); |
| 88 | + registryClient.persistEphemeral(workerRegistryPath, JSONUtils.toJsonString(workerHeartBeatTask.getHeartBeat())); |
98 | 89 | log.info("Worker node: {} registry to registry center {} successfully", workerConfig.getWorkerAddress(), |
99 | 90 | workerRegistryPath); |
100 | 91 |
|
101 | 92 | while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), RegistryNodeType.WORKER)) { |
102 | 93 | ThreadUtils.sleep(SLEEP_TIME_MILLIS); |
103 | 94 | } |
104 | 95 |
|
105 | | - // sleep 1s, waiting master failover remove |
106 | | - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); |
107 | | - |
108 | 96 | workerHeartBeatTask.start(); |
109 | 97 | log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress()); |
110 | 98 | } |
|
0 commit comments