Skip to content

Commit 5b11fe8

Browse files
kevin.cyjwsry
authored andcommitted
[FRS-72] Fix the heartbeat issue caused by zookeeper restart (manager may fail to remove lost worker after restarting zookeeper)
1 parent 59ecca7 commit 5b11fe8

File tree

16 files changed

+223
-164
lines changed

16 files changed

+223
-164
lines changed

shuffle-coordinator/src/main/java/com/alibaba/flink/shuffle/client/ShuffleManagerClientImpl.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import com.alibaba.flink.shuffle.core.ids.MapPartitionID;
4444
import com.alibaba.flink.shuffle.rpc.RemoteShuffleRpcService;
4545
import com.alibaba.flink.shuffle.rpc.RpcTargetAddress;
46-
import com.alibaba.flink.shuffle.rpc.executor.ScheduledExecutorServiceAdapter;
4746

4847
import org.slf4j.Logger;
4948
import org.slf4j.LoggerFactory;
@@ -157,7 +156,7 @@ public ShuffleManagerClientImpl(
157156
heartbeatService.createHeartbeatManagerSender(
158157
new InstanceID(jobID.getId()),
159158
new ManagerHeartbeatListener(),
160-
new ScheduledExecutorServiceAdapter(mainThreadExecutor),
159+
mainThreadExecutor,
161160
LOG);
162161
}
163162

@@ -340,7 +339,7 @@ public void receiveHeartbeat(InstanceID instanceID, Void heartbeatPayload) {
340339

341340
@Override
342341
public void requestHeartbeat(InstanceID instanceID, Void heartbeatPayload) {
343-
heartbeatToShuffleManager();
342+
mainThreadExecutor.execute(() -> heartbeatToShuffleManager());
344343
}
345344
});
346345

@@ -628,11 +627,17 @@ private class ManagerHeartbeatListener implements HeartbeatListener<Void, Void>
628627

629628
@Override
630629
public void notifyHeartbeatTimeout(InstanceID instanceID) {
631-
LOG.info("Timeout with remote shuffle manager {}", instanceID);
632-
if (establishedConnection != null
633-
&& establishedConnection.getResponse().getInstanceID().equals(instanceID)) {
634-
reconnectToShuffleManager(new Exception("Heartbeat timeout"));
635-
}
630+
mainThreadExecutor.execute(
631+
() -> {
632+
LOG.info("Timeout with remote shuffle manager {}", instanceID);
633+
if (establishedConnection != null
634+
&& establishedConnection
635+
.getResponse()
636+
.getInstanceID()
637+
.equals(instanceID)) {
638+
reconnectToShuffleManager(new Exception("Heartbeat timeout"));
639+
}
640+
});
636641
}
637642

638643
@Override

shuffle-coordinator/src/main/java/com/alibaba/flink/shuffle/coordinator/heartbeat/HeartbeatManagerImpl.java

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
package com.alibaba.flink.shuffle.coordinator.heartbeat;
1818

1919
import com.alibaba.flink.shuffle.core.ids.InstanceID;
20-
import com.alibaba.flink.shuffle.rpc.executor.ScheduledExecutor;
2120

2221
import org.slf4j.Logger;
2322

2423
import javax.annotation.concurrent.ThreadSafe;
2524

2625
import java.util.Map;
2726
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.ScheduledExecutorService;
2828

2929
import static com.alibaba.flink.shuffle.common.utils.CommonUtils.checkArgument;
3030
import static com.alibaba.flink.shuffle.common.utils.CommonUtils.checkNotNull;
@@ -52,7 +52,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
5252
private final HeartbeatListener<I, O> heartbeatListener;
5353

5454
/** Executor service used to run heartbeat timeout notifications. */
55-
private final ScheduledExecutor mainThreadExecutor;
55+
private final ScheduledExecutorService scheduledExecutor;
5656

5757
protected final Logger log;
5858

@@ -68,13 +68,13 @@ public HeartbeatManagerImpl(
6868
long heartbeatTimeoutIntervalMs,
6969
InstanceID ownInstanceID,
7070
HeartbeatListener<I, O> heartbeatListener,
71-
ScheduledExecutor mainThreadExecutor,
71+
ScheduledExecutorService scheduledExecutor,
7272
Logger log) {
7373
this(
7474
heartbeatTimeoutIntervalMs,
7575
ownInstanceID,
7676
heartbeatListener,
77-
mainThreadExecutor,
77+
scheduledExecutor,
7878
log,
7979
new HeartbeatMonitorImpl.Factory<>());
8080
}
@@ -83,7 +83,7 @@ public HeartbeatManagerImpl(
8383
long heartbeatTimeoutIntervalMs,
8484
InstanceID ownInstanceID,
8585
HeartbeatListener<I, O> heartbeatListener,
86-
ScheduledExecutor mainThreadExecutor,
86+
ScheduledExecutorService scheduledExecutor,
8787
Logger log,
8888
HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
8989

@@ -93,12 +93,10 @@ public HeartbeatManagerImpl(
9393
this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
9494
this.ownInstanceID = checkNotNull(ownInstanceID);
9595
this.heartbeatListener = checkNotNull(heartbeatListener);
96-
this.mainThreadExecutor = checkNotNull(mainThreadExecutor);
96+
this.scheduledExecutor = checkNotNull(scheduledExecutor);
9797
this.log = checkNotNull(log);
9898
this.heartbeatMonitorFactory = heartbeatMonitorFactory;
9999
this.heartbeatTargets = new ConcurrentHashMap<>(16);
100-
101-
stopped = false;
102100
}
103101

104102
// ----------------------------------------------------------------------------------------------
@@ -124,25 +122,29 @@ public Map<InstanceID, HeartbeatMonitor<O>> getHeartbeatTargets() {
124122
@Override
125123
public void monitorTarget(InstanceID instanceID, HeartbeatTarget<O> heartbeatTarget) {
126124
if (!stopped) {
127-
if (heartbeatTargets.containsKey(instanceID)) {
128-
log.debug("The target with instance ID {} is already been monitored.", instanceID);
129-
} else {
130-
HeartbeatMonitor<O> heartbeatMonitor =
131-
heartbeatMonitorFactory.createHeartbeatMonitor(
132-
instanceID,
133-
heartbeatTarget,
134-
mainThreadExecutor,
135-
heartbeatListener,
136-
heartbeatTimeoutIntervalMs);
137-
138-
heartbeatTargets.put(instanceID, heartbeatMonitor);
139-
140-
// check if we have stopped in the meantime (concurrent stop operation)
141-
if (stopped) {
142-
heartbeatMonitor.cancel();
143-
144-
heartbeatTargets.remove(instanceID);
145-
}
125+
// always monitor the new target to avoid the old monitor not in running state
126+
HeartbeatMonitor<O> heartbeatMonitor =
127+
heartbeatMonitorFactory.createHeartbeatMonitor(
128+
instanceID,
129+
heartbeatTarget,
130+
scheduledExecutor,
131+
heartbeatListener,
132+
heartbeatTimeoutIntervalMs,
133+
log);
134+
135+
HeartbeatMonitor<O> oldMonitor = heartbeatTargets.put(instanceID, heartbeatMonitor);
136+
if (oldMonitor != null) {
137+
oldMonitor.cancel();
138+
log.debug(
139+
"Replace the heartbeat monitor of target (instance id: {}) with new one.",
140+
instanceID);
141+
}
142+
143+
// check if we have stopped in the meantime (concurrent stop operation)
144+
if (stopped) {
145+
heartbeatMonitor.cancel();
146+
147+
heartbeatTargets.remove(instanceID);
146148
}
147149
}
148150
}
@@ -163,10 +165,15 @@ public void stop() {
163165
stopped = true;
164166

165167
for (HeartbeatMonitor<O> heartbeatMonitor : heartbeatTargets.values()) {
166-
heartbeatMonitor.cancel();
168+
try {
169+
heartbeatMonitor.cancel();
170+
} catch (Throwable throwable) {
171+
log.warn("Failed to cancel heartbeat.", throwable);
172+
}
167173
}
168174

169175
heartbeatTargets.clear();
176+
scheduledExecutor.shutdown();
170177
}
171178

172179
@Override
@@ -180,8 +187,8 @@ public long getLastHeartbeatFrom(InstanceID instanceID) {
180187
}
181188
}
182189

183-
ScheduledExecutor getMainThreadExecutor() {
184-
return mainThreadExecutor;
190+
ScheduledExecutorService getScheduledExecutor() {
191+
return scheduledExecutor;
185192
}
186193

187194
// ----------------------------------------------------------------------------------------------

shuffle-coordinator/src/main/java/com/alibaba/flink/shuffle/coordinator/heartbeat/HeartbeatManagerSenderImpl.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
package com.alibaba.flink.shuffle.coordinator.heartbeat;
1818

1919
import com.alibaba.flink.shuffle.core.ids.InstanceID;
20-
import com.alibaba.flink.shuffle.rpc.executor.ScheduledExecutor;
2120

2221
import org.slf4j.Logger;
2322

23+
import java.util.concurrent.ScheduledExecutorService;
2424
import java.util.concurrent.TimeUnit;
2525

2626
/**
@@ -40,14 +40,14 @@ public HeartbeatManagerSenderImpl(
4040
long heartbeatTimeout,
4141
InstanceID ownInstanceID,
4242
HeartbeatListener<I, O> heartbeatListener,
43-
ScheduledExecutor mainThreadExecutor,
43+
ScheduledExecutorService scheduledExecutor,
4444
Logger log) {
4545
this(
4646
heartbeatPeriod,
4747
heartbeatTimeout,
4848
ownInstanceID,
4949
heartbeatListener,
50-
mainThreadExecutor,
50+
scheduledExecutor,
5151
log,
5252
new HeartbeatMonitorImpl.Factory<>());
5353
}
@@ -57,30 +57,34 @@ public HeartbeatManagerSenderImpl(
5757
long heartbeatTimeout,
5858
InstanceID ownInstanceID,
5959
HeartbeatListener<I, O> heartbeatListener,
60-
ScheduledExecutor mainThreadExecutor,
60+
ScheduledExecutorService scheduledExecutor,
6161
Logger log,
6262
HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
6363
super(
6464
heartbeatTimeout,
6565
ownInstanceID,
6666
heartbeatListener,
67-
mainThreadExecutor,
67+
scheduledExecutor,
6868
log,
6969
heartbeatMonitorFactory);
7070

7171
this.heartbeatPeriod = heartbeatPeriod;
72-
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
72+
getScheduledExecutor().schedule(this, 0L, TimeUnit.MILLISECONDS);
7373
}
7474

7575
@Override
7676
public void run() {
7777
if (!stopped) {
7878
log.debug("Trigger heartbeat request.");
7979
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
80-
requestHeartbeat(heartbeatMonitor);
80+
try {
81+
requestHeartbeat(heartbeatMonitor);
82+
} catch (Throwable throwable) {
83+
log.warn("Failed to request heartbeat.", throwable);
84+
}
8185
}
8286

83-
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
87+
getScheduledExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
8488
}
8589
}
8690

shuffle-coordinator/src/main/java/com/alibaba/flink/shuffle/coordinator/heartbeat/HeartbeatMonitor.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
package com.alibaba.flink.shuffle.coordinator.heartbeat;
1818

1919
import com.alibaba.flink.shuffle.core.ids.InstanceID;
20-
import com.alibaba.flink.shuffle.rpc.executor.ScheduledExecutor;
20+
21+
import org.slf4j.Logger;
22+
23+
import java.util.concurrent.ScheduledExecutorService;
2124

2225
/**
2326
* Heartbeat monitor which manages the heartbeat state of the associated heartbeat target. The
@@ -66,16 +69,17 @@ interface Factory<O> {
6669
*
6770
* @param instanceID the resource id
6871
* @param heartbeatTarget the heartbeat target
69-
* @param mainThreadExecutor the main thread executor
7072
* @param heartbeatListener the heartbeat listener
7173
* @param heartbeatTimeoutIntervalMs the heartbeat timeout interval ms
74+
* @param log for logging
7275
* @return the heartbeat monitor
7376
*/
7477
HeartbeatMonitor<O> createHeartbeatMonitor(
7578
InstanceID instanceID,
7679
HeartbeatTarget<O> heartbeatTarget,
77-
ScheduledExecutor mainThreadExecutor,
80+
ScheduledExecutorService scheduledExecutor,
7881
HeartbeatListener<?, O> heartbeatListener,
79-
long heartbeatTimeoutIntervalMs);
82+
long heartbeatTimeoutIntervalMs,
83+
Logger log);
8084
}
8185
}

shuffle-coordinator/src/main/java/com/alibaba/flink/shuffle/coordinator/heartbeat/HeartbeatMonitorImpl.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package com.alibaba.flink.shuffle.coordinator.heartbeat;
1818

1919
import com.alibaba.flink.shuffle.core.ids.InstanceID;
20-
import com.alibaba.flink.shuffle.rpc.executor.ScheduledExecutor;
2120

21+
import org.slf4j.Logger;
22+
23+
import java.util.concurrent.ScheduledExecutorService;
2224
import java.util.concurrent.ScheduledFuture;
2325
import java.util.concurrent.TimeUnit;
2426
import java.util.concurrent.atomic.AtomicReference;
@@ -39,7 +41,7 @@ public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {
3941
/** Associated heartbeat target. */
4042
private final HeartbeatTarget<O> heartbeatTarget;
4143

42-
private final ScheduledExecutor scheduledExecutor;
44+
private final ScheduledExecutorService scheduledExecutor;
4345

4446
/** Listener which is notified about heartbeat timeouts. */
4547
private final HeartbeatListener<?, ?> heartbeatListener;
@@ -51,27 +53,29 @@ public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {
5153

5254
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
5355

56+
protected final Logger log;
57+
5458
private volatile long lastHeartbeat;
5559

5660
HeartbeatMonitorImpl(
5761
InstanceID instanceID,
5862
HeartbeatTarget<O> heartbeatTarget,
59-
ScheduledExecutor scheduledExecutor,
63+
ScheduledExecutorService scheduledExecutor,
6064
HeartbeatListener<?, O> heartbeatListener,
61-
long heartbeatTimeoutIntervalMs) {
65+
long heartbeatTimeoutIntervalMs,
66+
Logger log) {
6267

6368
this.instanceID = checkNotNull(instanceID);
6469
this.heartbeatTarget = checkNotNull(heartbeatTarget);
6570
this.scheduledExecutor = checkNotNull(scheduledExecutor);
6671
this.heartbeatListener = checkNotNull(heartbeatListener);
72+
this.log = checkNotNull(log);
6773

6874
checkArgument(
6975
heartbeatTimeoutIntervalMs > 0L,
7076
"The heartbeat timeout interval has to be larger than 0.");
7177
this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
7278

73-
lastHeartbeat = 0L;
74-
7579
resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
7680
}
7781

@@ -106,16 +110,16 @@ public void cancel() {
106110

107111
@Override
108112
public void run() {
109-
// The heartbeat has timed out if we're in state running
110-
if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
111-
heartbeatListener.notifyHeartbeatTimeout(instanceID);
113+
try {
114+
// The heartbeat has timed out if we're in state running
115+
if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
116+
heartbeatListener.notifyHeartbeatTimeout(instanceID);
117+
}
118+
} catch (Throwable throwable) {
119+
log.error("Failed to notify heartbeat timeout.", throwable);
112120
}
113121
}
114122

115-
public boolean isCanceled() {
116-
return state.get() == State.CANCELED;
117-
}
118-
119123
void resetHeartbeatTimeout(long heartbeatTimeout) {
120124
if (state.get() == State.RUNNING) {
121125
cancelTimeout();
@@ -153,16 +157,18 @@ static class Factory<O> implements HeartbeatMonitor.Factory<O> {
153157
public HeartbeatMonitor<O> createHeartbeatMonitor(
154158
InstanceID instanceID,
155159
HeartbeatTarget<O> heartbeatTarget,
156-
ScheduledExecutor mainThreadExecutor,
160+
ScheduledExecutorService scheduledExecutor,
157161
HeartbeatListener<?, O> heartbeatListener,
158-
long heartbeatTimeoutIntervalMs) {
162+
long heartbeatTimeoutIntervalMs,
163+
Logger log) {
159164

160165
return new HeartbeatMonitorImpl<>(
161166
instanceID,
162167
heartbeatTarget,
163-
mainThreadExecutor,
168+
scheduledExecutor,
164169
heartbeatListener,
165-
heartbeatTimeoutIntervalMs);
170+
heartbeatTimeoutIntervalMs,
171+
log);
166172
}
167173
}
168174
}

0 commit comments

Comments
 (0)