Skip to content

Commit 25108c8

Browse files
authored
[Improvement-16982][Master] When master startup, initialize the cluster from registry (#16983)
1 parent 899fb37 commit 25108c8

File tree

6 files changed

+139
-31
lines changed

6 files changed

+139
-31
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.dolphinscheduler.server.master.cluster;
1919

20+
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
21+
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
22+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2023
import org.apache.dolphinscheduler.registry.api.RegistryClient;
2124
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
2225

@@ -36,6 +39,9 @@ public class ClusterManager {
3639
@Getter
3740
private WorkerClusters workerClusters;
3841

42+
@Autowired
43+
private MasterSlotManager masterSlotManager;
44+
3945
@Autowired
4046
private WorkerGroupChangeNotifier workerGroupChangeNotifier;
4147

@@ -48,11 +54,48 @@ public ClusterManager() {
4854
}
4955

5056
public void start() {
57+
initializeMasterClusters();
58+
initializeWorkerClusters();
59+
log.info("ClusterManager started...");
60+
}
61+
62+
/**
63+
* Initialize the master clusters.
64+
* <p> 1. Register master slot listener once master clusters changed.
65+
* <p> 2. Fetch master nodes from registry.
66+
* <p> 3. Subscribe the master change event.
67+
*/
68+
private void initializeMasterClusters() {
69+
this.masterClusters.registerListener(new MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));
70+
71+
registryClient.getServerList(RegistryNodeType.MASTER).forEach(server -> {
72+
final MasterHeartBeat masterHeartBeat =
73+
JSONUtils.parseObject(server.getHeartBeatInfo(), MasterHeartBeat.class);
74+
masterClusters.onServerAdded(MasterServerMetadata.parseFromHeartBeat(masterHeartBeat));
75+
});
76+
log.info("Initialized MasterClusters: {}", JSONUtils.toPrettyJsonString(masterClusters.getServers()));
77+
5178
this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), masterClusters);
79+
}
80+
81+
/**
82+
* Initialize the worker clusters.
83+
* <p> 1. Fetch worker nodes from registry.
84+
* <p> 2. Register worker group change notifier once worker clusters changed.
85+
* <p> 3. Subscribe the worker change event.
86+
*/
87+
private void initializeWorkerClusters() {
88+
registryClient.getServerList(RegistryNodeType.WORKER).forEach(server -> {
89+
final WorkerHeartBeat workerHeartBeat =
90+
JSONUtils.parseObject(server.getHeartBeatInfo(), WorkerHeartBeat.class);
91+
workerClusters.onServerAdded(WorkerServerMetadata.parseFromHeartBeat(workerHeartBeat));
92+
});
93+
log.info("Initialized WorkerClusters: {}", JSONUtils.toPrettyJsonString(workerClusters.getServers()));
94+
5295
this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), workerClusters);
96+
5397
this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters);
5498
this.workerGroupChangeNotifier.start();
55-
log.info("ClusterManager started...");
5699
}
57100

58101
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.cluster;
19+
20+
import java.util.List;
21+
22+
public interface IMasterSlotChangeListener {
23+
24+
void onMasterSlotChanged(final List<MasterServerMetadata> normalMasterServers);
25+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.dolphinscheduler.server.master.cluster;
1919

20+
import static com.google.common.base.Preconditions.checkNotNull;
21+
2022
import org.apache.dolphinscheduler.common.constants.Constants;
2123
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
2224

@@ -32,6 +34,7 @@
3234
public class MasterServerMetadata extends BaseServerMetadata implements Comparable<MasterServerMetadata> {
3335

3436
public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) {
37+
checkNotNull(masterHeartBeat);
3538
return MasterServerMetadata.builder()
3639
.processId(masterHeartBeat.getProcessId())
3740
.serverStartupTime(masterHeartBeat.getStartupTime())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.cluster;
19+
20+
import java.util.List;
21+
22+
public class MasterSlotChangeListenerAdaptor
23+
implements
24+
IMasterSlotChangeListener,
25+
IClusters.IClustersChangeListener<MasterServerMetadata> {
26+
27+
private final MasterSlotManager masterSlotManager;
28+
29+
private final MasterClusters masterClusters;
30+
31+
public MasterSlotChangeListenerAdaptor(final MasterSlotManager masterSlotManager,
32+
final MasterClusters masterClusters) {
33+
this.masterSlotManager = masterSlotManager;
34+
this.masterClusters = masterClusters;
35+
}
36+
37+
@Override
38+
public void onMasterSlotChanged(final List<MasterServerMetadata> normalMasterServers) {
39+
masterSlotManager.doReBalance(normalMasterServers);
40+
}
41+
42+
@Override
43+
public void onServerAdded(MasterServerMetadata server) {
44+
onMasterSlotChanged(masterClusters.getNormalServers());
45+
}
46+
47+
@Override
48+
public void onServerRemove(MasterServerMetadata server) {
49+
onMasterSlotChanged(masterClusters.getNormalServers());
50+
}
51+
52+
@Override
53+
public void onServerUpdate(MasterServerMetadata server) {
54+
onMasterSlotChanged(masterClusters.getNormalServers());
55+
}
56+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,34 +29,14 @@
2929
@Component
3030
public class MasterSlotManager implements IMasterSlotReBalancer {
3131

32-
private final MasterClusters masterClusters;
33-
3432
private final MasterConfig masterConfig;
3533

3634
private volatile int currentSlot = -1;
3735

3836
private volatile int totalSlots = 0;
3937

40-
public MasterSlotManager(ClusterManager clusterManager, MasterConfig masterConfig) {
38+
public MasterSlotManager(final MasterConfig masterConfig) {
4139
this.masterConfig = masterConfig;
42-
this.masterClusters = clusterManager.getMasterClusters();
43-
this.masterClusters.registerListener(new IClusters.IClustersChangeListener<MasterServerMetadata>() {
44-
45-
@Override
46-
public void onServerAdded(MasterServerMetadata server) {
47-
doReBalance(masterClusters.getNormalServers());
48-
}
49-
50-
@Override
51-
public void onServerRemove(MasterServerMetadata server) {
52-
doReBalance(masterClusters.getNormalServers());
53-
}
54-
55-
@Override
56-
public void onServerUpdate(MasterServerMetadata server) {
57-
doReBalance(masterClusters.getNormalServers());
58-
}
59-
});
6040
}
6141

6242
/**

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,16 @@ class MasterSlotManagerTest {
2929

3030
private MasterSlotManager masterSlotManager;
3131

32-
private ClusterManager clusterManager;
32+
private MasterClusters masterClusters;
3333

3434
private MasterConfig masterConfig;
3535

3636
@BeforeEach
3737
public void setUp() {
38-
clusterManager = new ClusterManager();
38+
masterClusters = new MasterClusters();
3939
masterConfig = new MasterConfig();
4040
masterConfig.setMasterAddress("127.0.0.1:5678");
41-
masterSlotManager = new MasterSlotManager(clusterManager, masterConfig);
41+
masterSlotManager = new MasterSlotManager(masterConfig);
4242
MasterServerMetadata master1 = MasterServerMetadata.builder()
4343
.cpuUsage(0.2)
4444
.memoryUsage(0.4)
@@ -63,10 +63,11 @@ public void setUp() {
6363
.serverStatus(ServerStatus.BUSY)
6464
.address("127.0.0.4:5679")
6565
.build();
66-
clusterManager.getMasterClusters().onServerAdded(master1);
67-
clusterManager.getMasterClusters().onServerAdded(master2);
68-
clusterManager.getMasterClusters().onServerAdded(master3);
69-
clusterManager.getMasterClusters().onServerAdded(master4);
66+
this.masterClusters.registerListener(new MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));
67+
masterClusters.onServerAdded(master1);
68+
masterClusters.onServerAdded(master2);
69+
masterClusters.onServerAdded(master3);
70+
masterClusters.onServerAdded(master4);
7071
}
7172

7273
@Test
@@ -98,8 +99,8 @@ void doReBalance() {
9899
.serverStatus(ServerStatus.BUSY)
99100
.address("127.0.0.4:5679")
100101
.build();
101-
clusterManager.getMasterClusters().onServerRemove(master2);
102-
clusterManager.getMasterClusters().onServerRemove(master3);
102+
masterClusters.onServerRemove(master2);
103+
masterClusters.onServerRemove(master3);
103104
// After doReBalance, the total master slots should be 2
104105
assertThat(masterSlotManager.getTotalMasterSlots()).isEqualTo(2);
105106
}

0 commit comments

Comments
 (0)