Skip to content

Commit 0b0f97b

Browse files
committed
Add MasterCoordinatorListener
1 parent 42f3356 commit 0b0f97b

File tree

5 files changed

+46
-33
lines changed

5 files changed

+46
-33
lines changed

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/MasterCoordinator.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,7 @@ public MasterCoordinator(final Registry registry, final MasterConfig masterConfi
4343
registry,
4444
RegistryNodeType.MASTER_COORDINATOR.getRegistryPath(),
4545
masterConfig.getMasterAddress());
46-
47-
addServerStatusChangeListener(new AbstractServerStatusChangeListener() {
48-
49-
@Override
50-
public void changeToActive() {
51-
onActive();
52-
}
53-
54-
@Override
55-
public void changeToStandBy() {
56-
onStandBy();
57-
}
58-
});
46+
addServerStatusChangeListener(new MasterCoordinatorListener(taskGroupCoordinator));
5947
}
6048

6149
@Override
@@ -70,12 +58,23 @@ public void close() {
7058
log.info("MasterCoordinator shutdown...");
7159
}
7260

73-
private void onActive() {
74-
taskGroupCoordinator.start();
75-
}
61+
public static class MasterCoordinatorListener extends AbstractServerStatusChangeListener {
7662

77-
private void onStandBy() {
78-
taskGroupCoordinator.close();
63+
private final TaskGroupCoordinator taskGroupCoordinator;
64+
65+
public MasterCoordinatorListener(TaskGroupCoordinator taskGroupCoordinator) {
66+
this.taskGroupCoordinator = taskGroupCoordinator;
67+
}
68+
69+
@Override
70+
public void changeToActive() {
71+
taskGroupCoordinator.start();
72+
}
73+
74+
@Override
75+
public void changeToStandBy() {
76+
taskGroupCoordinator.close();
77+
}
7978
}
8079

8180
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ private void deleteTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
489489
@Override
490490
public synchronized void close() {
491491
if (!flag) {
492-
log.error("TaskGroupCoordinator is already closed");
492+
log.warn("TaskGroupCoordinator is already closed");
493493
return;
494494
}
495495
flag = false;

dolphinscheduler-master/src/test/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
</appender>
6969

7070
<!-- We use OFF here to avoid too many exception log in CI -->
71-
<root level="INFO">
71+
<root level="OFF">
7272
<appender-ref ref="STDOUT"/>
7373
<appender-ref ref="TASKLOGFILE"/>
7474
<appender-ref ref="MASTERLOGFILE"/>

dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static com.google.common.base.Preconditions.checkNotNull;
2121

22+
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2223
import org.apache.dolphinscheduler.registry.api.Event;
2324
import org.apache.dolphinscheduler.registry.api.Registry;
2425

@@ -41,6 +42,10 @@ public abstract class AbstractHAServer implements HAServer {
4142

4243
private final List<ServerStatusChangeListener> serverStatusChangeListeners;
4344

45+
private static final long DEFAULT_RETRY_INTERVAL = 5_000;
46+
47+
private static final int DEFAULT_MAX_RETRY_TIMES = 20;
48+
4449
public AbstractHAServer(final Registry registry, final String selectorPath, final String serverIdentify) {
4550
this.registry = registry;
4651
this.selectorPath = checkNotNull(selectorPath);
@@ -78,21 +83,30 @@ public boolean isActive() {
7883
@Override
7984
public boolean participateElection() {
8085
final String electionLock = selectorPath + "-lock";
81-
try {
82-
if (registry.acquireLock(electionLock)) {
83-
if (!registry.exists(selectorPath)) {
84-
registry.put(selectorPath, serverIdentify, true);
85-
return true;
86+
// If meet exception during participate election, will retry.
87+
// This can avoid the situation that the server is not elected as leader due to network jitter.
88+
for (int i = 0; i < DEFAULT_MAX_RETRY_TIMES; i++) {
89+
try {
90+
try {
91+
if (registry.acquireLock(electionLock)) {
92+
if (!registry.exists(selectorPath)) {
93+
registry.put(selectorPath, serverIdentify, true);
94+
return true;
95+
}
96+
return serverIdentify.equals(registry.get(selectorPath));
97+
}
98+
return false;
99+
} finally {
100+
registry.releaseLock(electionLock);
86101
}
87-
return serverIdentify.equals(registry.get(selectorPath));
102+
} catch (Exception e) {
103+
log.error("Participate election error, meet an exception, will retry after {}ms",
104+
DEFAULT_RETRY_INTERVAL, e);
105+
ThreadUtils.sleep(DEFAULT_RETRY_INTERVAL);
88106
}
89-
return false;
90-
} catch (Exception e) {
91-
log.error("participate election error", e);
92-
return false;
93-
} finally {
94-
registry.releaseLock(electionLock);
95107
}
108+
throw new IllegalStateException(
109+
"Participate election failed after retry " + DEFAULT_MAX_RETRY_TIMES + " times");
96110
}
97111

98112
@Override

dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/ServerStatusChangeListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@
1919

2020
public interface ServerStatusChangeListener {
2121

22-
void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus);
22+
void change(final HAServer.ServerStatus originStatus, final HAServer.ServerStatus currentStatus);
2323

2424
}

0 commit comments

Comments
 (0)