Skip to content

Commit 91387a5

Browse files
committed
Harden synchronization around SockJS heartbeats
Create an explicit heartbeat task with an experiration flag so that it can be cancelled reliably vs relying on the ScheduledFutureTask cancel method which may return true even if the task is already running. Issue: SPR-14356
1 parent 5b0722e commit 91387a5

File tree

2 files changed

+53
-61
lines changed

2 files changed

+53
-61
lines changed

spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/session/AbstractSockJsSession.java

Lines changed: 52 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import java.util.Set;
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.ScheduledFuture;
30-
import java.util.concurrent.locks.Lock;
31-
import java.util.concurrent.locks.ReentrantLock;
3230

3331
import org.apache.commons.logging.Log;
3432
import org.apache.commons.logging.LogFactory;
@@ -106,9 +104,11 @@ private enum State {NEW, OPEN, CLOSED}
106104

107105
private volatile long timeLastActive = this.timeCreated;
108106

109-
private volatile ScheduledFuture<?> heartbeatTask;
107+
private ScheduledFuture<?> heartbeatFuture;
110108

111-
private final Lock heartbeatLock = new ReentrantLock();
109+
private HeartbeatTask heartbeatTask;
110+
111+
private final Object heartbeatLock = new Object();
112112

113113
private volatile boolean heartbeatDisabled;
114114

@@ -249,19 +249,10 @@ public void disableHeartbeat() {
249249
}
250250

251251
public void sendHeartbeat() throws SockJsTransportFailureException {
252-
if (isActive()) {
253-
if (heartbeatLock.tryLock()) {
254-
try {
255-
if (this.heartbeatTask == null) {
256-
// Cancelled while waiting to acquire the lock
257-
return;
258-
}
259-
writeFrame(SockJsFrame.heartbeatFrame());
260-
scheduleHeartbeat();
261-
}
262-
finally {
263-
heartbeatLock.unlock();
264-
}
252+
synchronized (this.heartbeatLock) {
253+
if (isActive() && !this.heartbeatDisabled) {
254+
writeFrame(SockJsFrame.heartbeatFrame());
255+
scheduleHeartbeat();
265256
}
266257
}
267258
}
@@ -270,56 +261,33 @@ protected void scheduleHeartbeat() {
270261
if (this.heartbeatDisabled) {
271262
return;
272263
}
273-
274-
Assert.state(this.config.getTaskScheduler() != null, "Expected SockJS TaskScheduler");
275-
cancelHeartbeat();
276-
if (!isActive()) {
277-
return;
278-
}
279-
280-
Date time = new Date(System.currentTimeMillis() + this.config.getHeartbeatTime());
281-
this.heartbeatTask = this.config.getTaskScheduler().schedule(new Runnable() {
282-
public void run() {
283-
try {
284-
sendHeartbeat();
285-
}
286-
catch (Throwable ex) {
287-
// ignore
288-
}
289-
}
290-
}, time);
291-
if (logger.isTraceEnabled()) {
292-
logger.trace("Scheduled heartbeat in session " + getId());
293-
}
294-
}
295-
296-
protected void cancelHeartbeat() {
297-
try {
298-
ScheduledFuture<?> task = this.heartbeatTask;
299-
this.heartbeatTask = null;
300-
if (task == null || task.isCancelled()) {
264+
synchronized (this.heartbeatLock) {
265+
cancelHeartbeat();
266+
if (!isActive()) {
301267
return;
302268
}
303-
269+
Date time = new Date(System.currentTimeMillis() + this.config.getHeartbeatTime());
270+
this.heartbeatTask = new HeartbeatTask();
271+
this.heartbeatFuture = this.config.getTaskScheduler().schedule(this.heartbeatTask, time);
304272
if (logger.isTraceEnabled()) {
305-
logger.trace("Cancelling heartbeat in session " + getId());
306-
}
307-
if (task.cancel(false)) {
308-
return;
273+
logger.trace("Scheduled heartbeat in session " + getId());
309274
}
275+
}
276+
}
310277

311-
if (logger.isTraceEnabled()) {
312-
logger.trace("Failed to cancel heartbeat, acquiring heartbeat write lock.");
278+
protected void cancelHeartbeat() {
279+
synchronized (this.heartbeatLock) {
280+
if (this.heartbeatFuture != null) {
281+
if (logger.isTraceEnabled()) {
282+
logger.trace("Cancelling heartbeat in session " + getId());
283+
}
284+
this.heartbeatFuture.cancel(false);
285+
this.heartbeatFuture = null;
313286
}
314-
this.heartbeatLock.lock();
315-
316-
if (logger.isTraceEnabled()) {
317-
logger.trace("Releasing heartbeat lock.");
287+
if (this.heartbeatTask != null) {
288+
this.heartbeatTask.cancel();
289+
this.heartbeatTask = null;
318290
}
319-
this.heartbeatLock.unlock();
320-
}
321-
catch (Throwable ex) {
322-
logger.debug("Failure while cancelling heartbeat in session " + getId(), ex);
323291
}
324292
}
325293

@@ -469,4 +437,28 @@ public String toString() {
469437
return getClass().getSimpleName() + "[id=" + getId() + "]";
470438
}
471439

440+
441+
private class HeartbeatTask implements Runnable {
442+
443+
private boolean expired;
444+
445+
@Override
446+
public void run() {
447+
synchronized (heartbeatLock) {
448+
if (!this.expired) {
449+
try {
450+
sendHeartbeat();
451+
}
452+
finally {
453+
this.expired = true;
454+
}
455+
}
456+
}
457+
}
458+
459+
void cancel() {
460+
this.expired = true;
461+
}
462+
}
463+
472464
}

spring-websocket/src/test/java/org/springframework/web/socket/sockjs/transport/session/SockJsSessionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ public void scheduleHeartbeatNotActive() throws Exception {
288288
@Test
289289
public void sendHeartbeatWhenDisabled() throws Exception {
290290
this.session.disableHeartbeat();
291+
this.session.setActive(true);
291292
this.session.sendHeartbeat();
292293

293294
assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten());
@@ -310,7 +311,6 @@ public void scheduleAndCancelHeartbeat() throws Exception {
310311

311312
this.session.cancelHeartbeat();
312313

313-
verify(task).isCancelled();
314314
verify(task).cancel(false);
315315
verifyNoMoreInteractions(task);
316316
}

0 commit comments

Comments
 (0)