Skip to content

Commit e7874ab

Browse files
committed
Add proper synchronization to web socket clean-up
1 parent 1121d12 commit e7874ab

File tree

2 files changed

+47
-31
lines changed

2 files changed

+47
-31
lines changed

services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/websocket/WebSocket.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,15 @@ public void dispose() {
187187
* Sets the time of last received pong message.
188188
* @param instant Time of last received pong message.
189189
*/
190-
public void setLastPinged(Instant instant) {
190+
public synchronized void setLastPinged(Instant instant) {
191191
this.lastPinged = instant;
192192
}
193193

194194
/**
195195
*
196196
* @return The time of last received pong message.
197197
*/
198-
public Instant getLastPinged() {
198+
public synchronized Instant getLastPinged() {
199199
return lastPinged;
200200
}
201201

services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/websocket/WebSocketHandler.java

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,11 @@ public class WebSocketHandler extends TextWebSocketHandler {
6969
public void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) {
7070
try {
7171
// Find the WebSocket instance associated with the WebSocketSession
72-
Optional<WebSocket> webSocketOptional =
73-
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
72+
Optional<WebSocket> webSocketOptional;
73+
synchronized (sockets){
74+
webSocketOptional =
75+
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
76+
}
7477
if (webSocketOptional.isEmpty()) {
7578
return; // Should only happen in case of timing issues?
7679
}
@@ -102,8 +105,10 @@ public void afterConnectionEstablished(@NonNull WebSocketSession session) {
102105
*/
103106
@Override
104107
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) {
105-
Optional<WebSocket> webSocketOptional =
106-
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
108+
Optional<WebSocket> webSocketOptional;
109+
synchronized (sockets){
110+
webSocketOptional = sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
111+
}
107112
if (webSocketOptional.isPresent()) {
108113
logger.log(Level.INFO, "Closing web socket session " + webSocketOptional.get().getDescription());
109114
webSocketOptional.get().dispose();
@@ -137,8 +142,10 @@ public void handleTransportError(@NonNull WebSocketSession session, @NonNull Thr
137142
protected void handlePongMessage(@NonNull WebSocketSession session, @NonNull PongMessage message) {
138143
logger.log(Level.FINE, "Got pong for session " + session.getId());
139144
// Find the WebSocket instance associated with this WebSocketSession
140-
Optional<WebSocket> webSocketOptional =
141-
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
145+
Optional<WebSocket> webSocketOptional;
146+
synchronized (sockets) {
147+
webSocketOptional = sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
148+
}
142149
if (webSocketOptional.isPresent()) {
143150
webSocketOptional.get().setLastPinged(Instant.now());
144151
}
@@ -156,20 +163,24 @@ private String shorten(final String message) {
156163

157164
@PreDestroy
158165
public void cleanup() {
159-
sockets.forEach(s -> {
160-
logger.log(Level.INFO, "Disposing socket " + s.getDescription());
161-
s.dispose();
162-
});
166+
synchronized (sockets) {
167+
sockets.forEach(s -> {
168+
logger.log(Level.INFO, "Disposing socket " + s.getDescription());
169+
s.dispose();
170+
});
171+
}
163172
}
164173

165174
public void sendMessage(SaveAndRestoreWebSocketMessage webSocketMessage) {
166-
sockets.forEach(ws -> {
167-
try {
168-
ws.queueMessage(objectMapper.writeValueAsString(webSocketMessage));
169-
} catch (JsonProcessingException e) {
170-
throw new RuntimeException(e);
171-
}
172-
});
175+
synchronized (sockets) {
176+
sockets.forEach(ws -> {
177+
try {
178+
ws.queueMessage(objectMapper.writeValueAsString(webSocketMessage));
179+
} catch (JsonProcessingException e) {
180+
throw new RuntimeException(e);
181+
}
182+
});
183+
}
173184
}
174185

175186
/**
@@ -180,9 +191,11 @@ public void sendMessage(SaveAndRestoreWebSocketMessage webSocketMessage) {
180191
*
181192
*/
182193
@SuppressWarnings("unused")
183-
@Scheduled(cron = "* 0 * * * *")
194+
@Scheduled(cron = "0 * * * * *")
184195
public void pingClients(){
185-
sockets.forEach(WebSocket::sendPing);
196+
synchronized (sockets) {
197+
sockets.forEach(WebSocket::sendPing);
198+
}
186199
}
187200

188201
/**
@@ -194,18 +207,21 @@ public void pingClients(){
194207
*
195208
*/
196209
@SuppressWarnings("unused")
197-
@Scheduled(cron = "* 5 * * * *")
210+
@Scheduled(cron = "10 * * * * *")
198211
public void cleanUpDeadSockets(){
199212
List<WebSocket> deadSockets = new ArrayList<>();
200213
Instant now = Instant.now();
201-
sockets.forEach(s -> {
202-
if(s.getLastPinged() != null && s.getLastPinged().isBefore(now.minus(70, ChronoUnit.MINUTES))){
203-
deadSockets.add(s);
204-
}
205-
});
206-
deadSockets.forEach(d -> {
207-
sockets.remove(d);
208-
d.dispose();
209-
});
214+
synchronized (sockets) {
215+
sockets.forEach(s -> {
216+
Instant lastPinged = s.getLastPinged();
217+
if (lastPinged != null && lastPinged.isBefore(now.minus(70, ChronoUnit.MINUTES))) {
218+
deadSockets.add(s);
219+
}
220+
});
221+
deadSockets.forEach(d -> {
222+
sockets.remove(d);
223+
d.dispose();
224+
});
225+
}
210226
}
211227
}

0 commit comments

Comments
 (0)