Skip to content

Commit 071338e

Browse files
author
Rob Harrop
committed
Reworked HeartbeatSender to remove race with early shutdown
1 parent 1d13f2f commit 071338e

File tree

1 file changed

+37
-35
lines changed

1 file changed

+37
-35
lines changed

src/com/rabbitmq/client/impl/HeartbeatSender.java

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ final class HeartbeatSender {
5757

5858
private ScheduledFuture<?> future;
5959

60+
private boolean shutdown = false;
61+
6062
private volatile long lastActivityTime;
6163

6264
HeartbeatSender(FrameHandler frameHandler) {
@@ -71,30 +73,28 @@ public void signalActivity() {
7173
* Sets the heartbeat in seconds.
7274
*/
7375
public void setHeartbeat(int heartbeatSeconds) {
74-
ScheduledFuture<?> previousFuture;
75-
synchronized (this.monitor) {
76-
previousFuture = this.future;
77-
this.future = null;
78-
}
76+
synchronized(this.monitor) {
77+
if(this.shutdown) {
78+
throw new IllegalStateException("HeartbeatSender is shutdown." +
79+
" Cannot set new interval.");
80+
}
7981

80-
if (previousFuture != null) {
81-
previousFuture.cancel(true);
82-
}
82+
// cancel any existing heartbeat task
83+
if(this.future != null) {
84+
this.future.cancel(true);
85+
this.future = null;
86+
}
8387

84-
if (heartbeatSeconds > 0) {
85-
// wake every heartbeatSeconds / 2 to avoid the worst case
86-
// where the last activity comes just after the last heartbeat
87-
long interval = SECONDS.toMillis(heartbeatSeconds) / 2;
88-
ScheduledExecutorService executor = createExecutorIfNecessary();
89-
Runnable task = new HeartbeatRunnable(interval);
90-
ScheduledFuture<?> newFuture = executor.scheduleAtFixedRate(
88+
if (heartbeatSeconds > 0) {
89+
// wake every heartbeatSeconds / 2 to avoid the worst case
90+
// where the last activity comes just after the last heartbeat
91+
long interval = SECONDS.toNanos(heartbeatSeconds) / 2;
92+
ScheduledExecutorService executor = createExecutorIfNecessary();
93+
Runnable task = new HeartbeatRunnable(interval);
94+
this.future = executor.scheduleAtFixedRate(
9195
task, interval, interval, TimeUnit.MILLISECONDS);
92-
93-
synchronized (this.monitor) {
94-
this.future = newFuture;
9596
}
9697
}
97-
9898
}
9999

100100
private ScheduledExecutorService createExecutorIfNecessary() {
@@ -110,31 +110,33 @@ private ScheduledExecutorService createExecutorIfNecessary() {
110110
* Shutdown the heartbeat process, if any.
111111
*/
112112
public void shutdown() {
113-
ScheduledFuture<?> future;
114-
ScheduledExecutorService executor;
115-
113+
ScheduledExecutorService executorToShutdown = null;
116114
synchronized (this.monitor) {
117-
future = this.future;
118-
executor = this.executor;
119-
this.future = null;
120-
this.executor = null;
121-
}
115+
if (this.future != null) {
116+
this.future.cancel(true);
117+
this.future = null;
118+
}
122119

123-
if (future != null) {
124-
future.cancel(true);
125-
}
120+
if (this.executor != null) {
121+
// to be safe, we shouldn't call shutdown holding the
122+
// monitor.
123+
executorToShutdown = this.executor;
126124

127-
if (executor != null) {
128-
executor.shutdown();
125+
this.shutdown = true;
126+
this.executor = null;
127+
}
128+
}
129+
if(executorToShutdown != null) {
130+
executorToShutdown.shutdown();
129131
}
130132
}
131133

132-
private class HeartbeatRunnable implements Runnable {
134+
private final class HeartbeatRunnable implements Runnable {
133135

134136
private final long heartbeatNanos;
135137

136-
private HeartbeatRunnable(long heartbeatMillis) {
137-
this.heartbeatNanos = MILLISECONDS.toNanos(heartbeatMillis);
138+
private HeartbeatRunnable(long heartbeatNanos) {
139+
this.heartbeatNanos = heartbeatNanos;
138140
}
139141

140142
public void run() {

0 commit comments

Comments
 (0)