Skip to content

Commit 3cb9330

Browse files
ruanwenjunSbloodyS
andauthored
[Improvement] Optimize the AbstractHAServer implementation (#16810)
Co-authored-by: xiangzihao <[email protected]>
1 parent 610f7fe commit 3cb9330

File tree

7 files changed

+106
-67
lines changed

7 files changed

+106
-67
lines changed

dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,19 @@
1818
package org.apache.dolphinscheduler.alert;
1919

2020
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
21+
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
22+
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
23+
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
2124
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
25+
import org.apache.dolphinscheduler.alert.service.AlertHAServer;
2226
import org.apache.dolphinscheduler.common.CommonConfiguration;
2327
import org.apache.dolphinscheduler.common.constants.Constants;
2428
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
2529
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
2630
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2731
import org.apache.dolphinscheduler.dao.DaoConfiguration;
2832
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
33+
import org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;
2934

3035
import javax.annotation.PostConstruct;
3136
import javax.annotation.PreDestroy;
@@ -44,6 +49,18 @@
4449
@SpringBootApplication
4550
public class AlertServer {
4651

52+
@Autowired
53+
private AlertRpcServer alertRpcServer;
54+
55+
@Autowired
56+
private AlertPluginManager alertPluginManager;
57+
58+
@Autowired
59+
private AlertRegistryClient alertRegistryClient;
60+
61+
@Autowired
62+
private AlertHAServer alertHAServer;
63+
4764
@Autowired
4865
private AlertBootstrapService alertBootstrapService;
4966

@@ -58,7 +75,25 @@ public static void main(String[] args) {
5875
public void run() {
5976
ServerLifeCycleManager.toRunning();
6077
log.info("AlertServer is staring ...");
61-
alertBootstrapService.start();
78+
alertPluginManager.start();
79+
alertRpcServer.start();
80+
alertRegistryClient.start();
81+
82+
alertHAServer.addServerStatusChangeListener(new AbstractServerStatusChangeListener() {
83+
84+
@Override
85+
public void changeToActive() {
86+
alertBootstrapService.start();
87+
}
88+
89+
@Override
90+
public void changeToStandBy() {
91+
close();
92+
}
93+
});
94+
95+
alertHAServer.start();
96+
6297
log.info("AlertServer is started ...");
6398
}
6499

@@ -73,7 +108,12 @@ public void close() {
73108
return;
74109
}
75110
log.info("AlertServer is stopping, cause: {}", cause);
76-
alertBootstrapService.close();
111+
try (
112+
final AlertRpcServer ignore = alertRpcServer;
113+
final AlertRegistryClient ignore1 = alertRegistryClient;
114+
final AlertHAServer ignore2 = alertHAServer;
115+
final AlertBootstrapService ignore3 = alertBootstrapService;) {
116+
}
77117
// thread sleep 3 seconds for thread quietly stop
78118
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
79119
log.info("AlertServer stopped, cause: {}", cause);

dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.alert.service;
1919

20-
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
2120
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
2221
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
2322

@@ -32,39 +31,21 @@
3231
@Service
3332
public final class AlertBootstrapService implements AutoCloseable {
3433

35-
private final AlertRpcServer alertRpcServer;
36-
37-
private final AlertRegistryClient alertRegistryClient;
38-
39-
private final AlertPluginManager alertPluginManager;
40-
41-
private final AlertHAServer alertHAServer;
42-
4334
private final AlertEventFetcher alertEventFetcher;
4435

4536
private final AlertEventLoop alertEventLoop;
4637

4738
public AlertBootstrapService(AlertRpcServer alertRpcServer,
4839
AlertRegistryClient alertRegistryClient,
49-
AlertPluginManager alertPluginManager,
5040
AlertHAServer alertHAServer,
5141
AlertEventFetcher alertEventFetcher,
5242
AlertEventLoop alertEventLoop) {
53-
this.alertRpcServer = alertRpcServer;
54-
this.alertRegistryClient = alertRegistryClient;
55-
this.alertPluginManager = alertPluginManager;
56-
this.alertHAServer = alertHAServer;
5743
this.alertEventFetcher = alertEventFetcher;
5844
this.alertEventLoop = alertEventLoop;
5945
}
6046

6147
public void start() {
6248
log.info("AlertBootstrapService starting...");
63-
alertPluginManager.start();
64-
alertRpcServer.start();
65-
alertRegistryClient.start();
66-
alertHAServer.start();
67-
6849
alertEventFetcher.start();
6950
alertEventLoop.start();
7051
log.info("AlertBootstrapService started...");
@@ -73,15 +54,8 @@ public void start() {
7354
@Override
7455
public void close() {
7556
log.info("AlertBootstrapService stopping...");
76-
try (
77-
AlertRpcServer closedAlertRpcServer = alertRpcServer;
78-
AlertRegistryClient closedAlertRegistryClient = alertRegistryClient) {
79-
// close resource
80-
alertEventFetcher.shutdown();
81-
82-
alertEventLoop.shutdown();
83-
alertHAServer.shutdown();
84-
}
57+
alertEventFetcher.shutdown();
58+
alertEventLoop.shutdown();
8559
log.info("AlertBootstrapService stopped...");
8660
}
8761
}

dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertHAServer.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.alert.service;
1919

20+
import org.apache.dolphinscheduler.alert.config.AlertConfig;
2021
import org.apache.dolphinscheduler.registry.api.Registry;
2122
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
2223
import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
@@ -29,8 +30,18 @@
2930
@Component
3031
public class AlertHAServer extends AbstractHAServer {
3132

32-
public AlertHAServer(Registry registry) {
33-
super(registry, RegistryNodeType.ALERT_LOCK.getRegistryPath());
33+
public AlertHAServer(final Registry registry, final AlertConfig alertConfig) {
34+
super(registry, RegistryNodeType.ALERT_HA_LEADER.getRegistryPath(), alertConfig.getAlertServerAddress());
3435
}
3536

37+
@Override
38+
public void start() {
39+
super.start();
40+
log.info("AlertHAServer started...");
41+
}
42+
43+
@Override
44+
public void close() {
45+
log.info("AlertHAServer shutdown...");
46+
}
3647
}

dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@
2424
@AllArgsConstructor
2525
public enum RegistryNodeType {
2626

27-
ALL_SERVERS("nodes", "/nodes"),
2827
MASTER("Master", "/nodes/master"),
29-
MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"),
3028
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
3129
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"),
3230
MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator", "/lock/master-serial-workflow-coordinator"),
31+
3332
WORKER("Worker", "/nodes/worker"),
33+
3434
ALERT_SERVER("AlertServer", "/nodes/alert-server"),
35-
ALERT_LOCK("AlertNodeLock", "/lock/alert");
35+
ALERT_HA_LEADER("AlertHALeader", "/nodes/alert-server-ha-leader");
3636

3737
private final String name;
3838

dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
package org.apache.dolphinscheduler.registry.api.ha;
1919

20-
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
20+
import static com.google.common.base.Preconditions.checkNotNull;
21+
2122
import org.apache.dolphinscheduler.registry.api.Event;
2223
import org.apache.dolphinscheduler.registry.api.Registry;
2324

2425
import java.util.List;
25-
import java.util.concurrent.ScheduledExecutorService;
26-
import java.util.concurrent.TimeUnit;
2726

2827
import lombok.extern.slf4j.Slf4j;
2928

@@ -34,38 +33,41 @@ public abstract class AbstractHAServer implements HAServer {
3433

3534
private final Registry registry;
3635

37-
private final String serverPath;
36+
private final String selectorPath;
37+
38+
private final String serverIdentify;
3839

3940
private ServerStatus serverStatus;
4041

4142
private final List<ServerStatusChangeListener> serverStatusChangeListeners;
4243

43-
public AbstractHAServer(Registry registry, String serverPath) {
44+
public AbstractHAServer(final Registry registry, final String selectorPath, final String serverIdentify) {
4445
this.registry = registry;
45-
this.serverPath = serverPath;
46+
this.selectorPath = checkNotNull(selectorPath);
47+
this.serverIdentify = checkNotNull(serverIdentify);
4648
this.serverStatus = ServerStatus.STAND_BY;
4749
this.serverStatusChangeListeners = Lists.newArrayList(new DefaultServerStatusChangeListener());
4850
}
4951

5052
@Override
5153
public void start() {
52-
registry.subscribe(serverPath, event -> {
54+
registry.subscribe(selectorPath, event -> {
5355
if (Event.Type.REMOVE.equals(event.type())) {
54-
if (isActive() && !participateElection()) {
56+
if (serverIdentify.equals(event.data())) {
5557
statusChange(ServerStatus.STAND_BY);
58+
} else {
59+
if (participateElection()) {
60+
statusChange(ServerStatus.ACTIVE);
61+
}
5662
}
5763
}
5864
});
59-
ScheduledExecutorService electionSelectionThread =
60-
ThreadUtils.newSingleDaemonScheduledExecutorService("election-selection-thread");
61-
electionSelectionThread.schedule(() -> {
62-
if (isActive()) {
63-
return;
64-
}
65-
if (participateElection()) {
66-
statusChange(ServerStatus.ACTIVE);
67-
}
68-
}, 10, TimeUnit.SECONDS);
65+
66+
if (participateElection()) {
67+
statusChange(ServerStatus.ACTIVE);
68+
} else {
69+
log.info("Server {} is standby", serverIdentify);
70+
}
6971
}
7072

7173
@Override
@@ -75,7 +77,22 @@ public boolean isActive() {
7577

7678
@Override
7779
public boolean participateElection() {
78-
return registry.acquireLock(serverPath, 3_000);
80+
final String electionLock = selectorPath + "-lock";
81+
try {
82+
if (registry.acquireLock(electionLock)) {
83+
if (!registry.exists(selectorPath)) {
84+
registry.put(selectorPath, serverIdentify, true);
85+
return true;
86+
}
87+
return serverIdentify.equals(registry.get(selectorPath));
88+
}
89+
return false;
90+
} catch (Exception e) {
91+
log.error("participate election error", e);
92+
return false;
93+
} finally {
94+
registry.releaseLock(electionLock);
95+
}
7996
}
8097

8198
@Override
@@ -88,18 +105,15 @@ public ServerStatus getServerStatus() {
88105
return serverStatus;
89106
}
90107

91-
@Override
92-
public void shutdown() {
93-
if (isActive()) {
94-
registry.releaseLock(serverPath);
95-
}
96-
}
97-
98108
private void statusChange(ServerStatus targetStatus) {
109+
final ServerStatus originStatus = serverStatus;
110+
serverStatus = targetStatus;
99111
synchronized (this) {
100-
ServerStatus originStatus = serverStatus;
101-
serverStatus = targetStatus;
102-
serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus));
112+
try {
113+
serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus));
114+
} catch (Exception ex) {
115+
log.error("Trigger ServerStatusChangeListener from {} -> {} error", originStatus, targetStatus, ex);
116+
}
103117
}
104118
}
105119
}

dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractServerStatusChangeListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public abstract class AbstractServerStatusChangeListener implements ServerStatus
2424

2525
@Override
2626
public void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus) {
27-
log.info("The status change from {} to {}.", originStatus, currentStatus);
2827
if (originStatus == HAServer.ServerStatus.ACTIVE) {
2928
if (currentStatus == HAServer.ServerStatus.STAND_BY) {
3029
changeToStandBy();

dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/HAServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
* Interface for HA server, used to select a active server from multiple servers.
2222
* In HA mode, there are multiple servers, only one server is active, others are standby.
2323
*/
24-
public interface HAServer {
24+
public interface HAServer extends AutoCloseable {
2525

2626
/**
2727
* Start the server.
@@ -57,7 +57,8 @@ public interface HAServer {
5757
/**
5858
* Shutdown the server, release resources.
5959
*/
60-
void shutdown();
60+
@Override
61+
void close();
6162

6263
enum ServerStatus {
6364
ACTIVE,

0 commit comments

Comments
 (0)