Skip to content

Commit a6778c2

Browse files
authored
Merge pull request #3399 from ControlSystemStudio/CSSTUDIO-1967
Periodic check of web socket client connection
2 parents fe56add + a8a3579 commit a6778c2

File tree

3 files changed

+150
-54
lines changed

3 files changed

+150
-54
lines changed

core/websocket/src/main/java/org/phoebus/core/websocket/WebSocketClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class WebSocketClient implements WebSocket.Listener {
3535
private final Consumer<CharSequence> onTextCallback;
3636

3737
private final AtomicBoolean attemptReconnect = new AtomicBoolean();
38+
private final AtomicBoolean keepPinging = new AtomicBoolean();
3839
private CountDownLatch pingCountdownLatch;
3940

4041
/**
@@ -90,6 +91,7 @@ public void onOpen(WebSocket webSocket) {
9091
connectCallback.run();
9192
}
9293
logger.log(Level.INFO, "Connected to " + uri);
94+
keepPinging.set(true);
9395
new Thread(new PingRunnable()).start();
9496
}
9597

@@ -134,7 +136,7 @@ public CompletionStage<?> onClose(WebSocket webSocket,
134136
* is called.
135137
*/
136138
public void sendPing() {
137-
logger.log(Level.FINE, "Sending ping");
139+
logger.log(Level.FINE, Thread.currentThread().getName() + " Sending ping");
138140
webSocket.sendPing(ByteBuffer.allocate(0));
139141
}
140142

@@ -175,6 +177,7 @@ public CompletionStage<?> onText(WebSocket webSocket,
175177
* @param reason Custom reason text.
176178
*/
177179
public void close(String reason) {
180+
keepPinging.set(false);
178181
webSocket.sendClose(1000, reason);
179182
}
180183

@@ -197,7 +200,7 @@ private class PingRunnable implements Runnable {
197200

198201
@Override
199202
public void run() {
200-
while (true) {
203+
while (keepPinging.get()) {
201204
pingCountdownLatch = new CountDownLatch(1);
202205
sendPing();
203206
try {

services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/websocket/WebSocket.java

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,20 @@
88

99
import com.fasterxml.jackson.databind.JsonNode;
1010
import com.fasterxml.jackson.databind.ObjectMapper;
11+
import org.springframework.web.socket.PingMessage;
1112
import org.springframework.web.socket.TextMessage;
1213
import org.springframework.web.socket.WebSocketSession;
1314

15+
import java.io.IOException;
16+
import java.net.InetSocketAddress;
17+
import java.time.Instant;
1418
import java.util.concurrent.ArrayBlockingQueue;
1519
import java.util.concurrent.atomic.AtomicBoolean;
1620
import java.util.logging.Level;
1721
import java.util.logging.Logger;
1822

1923
/**
20-
* Utility class for handling web socket messages. In the context of the save-and-restore service,
21-
* only messages from server are expected. Client messages are logged, but do not invoke any behavior.
24+
* Utility class for handling web socket messages.
2225
*/
2326
@SuppressWarnings("nls")
2427
public class WebSocket {
@@ -41,11 +44,16 @@ public class WebSocket {
4144

4245
private final WebSocketSession session;
4346
private final String id;
44-
47+
private final String description;
4548
private final Logger logger = Logger.getLogger(WebSocket.class.getName());
46-
4749
private final ObjectMapper objectMapper;
4850

51+
/**
52+
* Keeps track of when this session was used for a ping/pong exchange. Should be set to non-null value ONLY
53+
* when an actual pong was received by {@link WebSocketHandler}.
54+
*/
55+
private Instant lastPinged;
56+
4957
/**
5058
* Constructor
5159
*/
@@ -58,6 +66,8 @@ public WebSocket(ObjectMapper objectMapper, WebSocketSession webSocketSession) {
5866
writeThread.setName("Web Socket Write Thread " + this.id);
5967
writeThread.setDaemon(true);
6068
writeThread.start();
69+
InetSocketAddress inetSocketAddress = webSocketSession.getRemoteAddress();
70+
this.description = this.id + "/" + (inetSocketAddress != null ? inetSocketAddress.getAddress().toString() : "IP address unknown");
6171
}
6272

6373
/**
@@ -70,6 +80,14 @@ public String getId() {
7080
return id;
7181
}
7282

83+
/**
84+
*
85+
* @return A description containing the session ID and - if available - the associated IP address.
86+
*/
87+
public String getDescription() {
88+
return description;
89+
}
90+
7391
/**
7492
* @param message Potentially long message
7593
* @return Message shorted to 200 chars
@@ -137,7 +155,7 @@ private void writeQueuedMessages() {
137155
}
138156

139157
/**
140-
* Called when client sends a general message
158+
* Called when client sends a generic message
141159
*
142160
* @param message {@link TextMessage}, its payload is expected to be JSON.
143161
*/
@@ -150,12 +168,6 @@ public void handleTextMessage(TextMessage message) throws Exception {
150168
logger.log(Level.INFO, "Client message type: " + type);
151169
}
152170

153-
/**
154-
* Clears all PVs
155-
*
156-
* <p>Web socket calls this onClose(),
157-
* but context may also call this again just in case
158-
*/
159171
public void dispose() {
160172
// Exit write thread
161173
try {
@@ -166,8 +178,35 @@ public void dispose() {
166178
// TODO: is this needed?
167179
session.close();
168180
} catch (Throwable ex) {
169-
logger.log(Level.WARNING, "Error disposing " + getId(), ex);
181+
logger.log(Level.WARNING, "Error disposing " + description, ex);
182+
}
183+
logger.log(Level.INFO, () -> "Web socket " + description + " closed");
184+
}
185+
186+
/**
187+
* Sets the time of last received pong message.
188+
* @param instant Time of last received pong message.
189+
*/
190+
public synchronized void setLastPinged(Instant instant) {
191+
this.lastPinged = instant;
192+
}
193+
194+
/**
195+
*
196+
* @return The time of last received pong message.
197+
*/
198+
public synchronized Instant getLastPinged() {
199+
return lastPinged;
200+
}
201+
202+
/**
203+
* Sends a {@link PingMessage} to peer.
204+
*/
205+
public void sendPing() {
206+
try {
207+
session.sendMessage(new PingMessage());
208+
} catch (IOException e) {
209+
logger.log(Level.WARNING, "Failed to send ping message", e);
170210
}
171-
logger.log(Level.INFO, () -> "Web socket " + session.getId() + " closed");
172211
}
173212
}

services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/websocket/WebSocketHandler.java

Lines changed: 93 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,5 @@
11
/*
2-
* Copyright (C) 2023 European Spallation Source ERIC.
3-
*
4-
* This program is free software; you can redistribute it and/or
5-
* modify it under the terms of the GNU General Public License
6-
* as published by the Free Software Foundation; either version 2
7-
* of the License, or (at your option) any later version.
8-
*
9-
* This program is distributed in the hope that it will be useful,
10-
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11-
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12-
* GNU General Public License for more details.
13-
*
14-
* You should have received a copy of the GNU General Public License
15-
* along with this program; if not, write to the Free Software
16-
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
2+
* Copyright (C) 2025 European Spallation Source ERIC.
173
*
184
*/
195

@@ -24,6 +10,7 @@
2410
import org.phoebus.applications.saveandrestore.model.websocket.SaveAndRestoreWebSocketMessage;
2511
import org.springframework.beans.factory.annotation.Autowired;
2612
import org.springframework.lang.NonNull;
13+
import org.springframework.scheduling.annotation.Scheduled;
2714
import org.springframework.stereotype.Component;
2815
import org.springframework.web.socket.CloseStatus;
2916
import org.springframework.web.socket.PongMessage;
@@ -33,14 +20,27 @@
3320

3421
import javax.annotation.PreDestroy;
3522
import java.io.EOFException;
23+
import java.net.InetSocketAddress;
24+
import java.time.Instant;
25+
import java.time.temporal.ChronoUnit;
3626
import java.util.ArrayList;
27+
import java.util.Collections;
3728
import java.util.List;
3829
import java.util.Optional;
3930
import java.util.logging.Level;
4031
import java.util.logging.Logger;
4132

4233
/**
4334
* Single web socket end-point routing messages to active {@link WebSocket} instances.
35+
*
36+
* <p>
37+
* In some cases web socket clients may become stale/disconnected for various reasons, e.g. network issues. The
38+
* {@link #afterConnectionClosed(WebSocketSession, CloseStatus)} is not necessarily called in those case.
39+
* To make sure the {@link #sockets} collection does not contain stale clients, a scheduled job runs once per hour to
40+
* ping all clients, and set the time when the pong response was received. Another scheduled job will check
41+
* the last received pong message timestamp and - if older than 70 minutes - consider the client session dead
42+
* and dispose of it.
43+
* </p>
4444
*/
4545
@Component
4646
public class WebSocketHandler extends TextWebSocketHandler {
@@ -49,7 +49,7 @@ public class WebSocketHandler extends TextWebSocketHandler {
4949
* List of active {@link WebSocket}
5050
*/
5151
@SuppressWarnings("unused")
52-
private List<WebSocket> sockets = new ArrayList<>();
52+
private List<WebSocket> sockets = Collections.synchronizedList(new ArrayList<>());
5353

5454
@SuppressWarnings("unused")
5555
@Autowired
@@ -69,8 +69,11 @@ public class WebSocketHandler extends TextWebSocketHandler {
6969
public void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) {
7070
try {
7171
// Find the WebSocket instance associated with the WebSocketSession
72-
Optional<WebSocket> webSocketOptional =
73-
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
72+
Optional<WebSocket> webSocketOptional;
73+
synchronized (sockets){
74+
webSocketOptional =
75+
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
76+
}
7477
if (webSocketOptional.isEmpty()) {
7578
return; // Should only happen in case of timing issues?
7679
}
@@ -87,7 +90,8 @@ public void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMe
8790
*/
8891
@Override
8992
public void afterConnectionEstablished(@NonNull WebSocketSession session) {
90-
logger.log(Level.INFO, "Opening web socket session from remote " + session.getRemoteAddress().getAddress());
93+
InetSocketAddress inetSocketAddress = session.getRemoteAddress();
94+
logger.log(Level.INFO, "Opening web socket session from remote " + (inetSocketAddress != null ? inetSocketAddress.getAddress().toString() : "<unknown IP address>"));
9195
WebSocket webSocket = new WebSocket(objectMapper, session);
9296
sockets.add(webSocket);
9397
}
@@ -101,10 +105,12 @@ public void afterConnectionEstablished(@NonNull WebSocketSession session) {
101105
*/
102106
@Override
103107
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) {
104-
Optional<WebSocket> webSocketOptional =
105-
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
108+
Optional<WebSocket> webSocketOptional;
109+
synchronized (sockets){
110+
webSocketOptional = sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
111+
}
106112
if (webSocketOptional.isPresent()) {
107-
logger.log(Level.INFO, "Closing web socket session from remote " + session.getRemoteAddress().getAddress());
113+
logger.log(Level.INFO, "Closing web socket session " + webSocketOptional.get().getDescription());
108114
webSocketOptional.get().dispose();
109115
sockets.remove(webSocketOptional.get());
110116
}
@@ -126,20 +132,22 @@ public void handleTransportError(@NonNull WebSocketSession session, @NonNull Thr
126132
}
127133

128134
/**
129-
* Called when client sends ping message, i.e. a pong message is sent and time for last message
135+
* Called when client sends ping message, i.e. a pong message is sent and time for last pong response message
130136
* in the {@link WebSocket} instance is refreshed.
131137
*
132138
* @param session Associated {@link WebSocketSession}
133139
* @param message See {@link PongMessage}
134140
*/
135141
@Override
136142
protected void handlePongMessage(@NonNull WebSocketSession session, @NonNull PongMessage message) {
137-
logger.log(Level.INFO, "Got pong");
143+
logger.log(Level.FINE, "Got pong for session " + session.getId());
138144
// Find the WebSocket instance associated with this WebSocketSession
139-
Optional<WebSocket> webSocketOptional =
140-
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
141-
if (webSocketOptional.isEmpty()) {
142-
return; // Should only happen in case of timing issues?
145+
Optional<WebSocket> webSocketOptional;
146+
synchronized (sockets) {
147+
webSocketOptional = sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
148+
}
149+
if (webSocketOptional.isPresent()) {
150+
webSocketOptional.get().setLastPinged(Instant.now());
143151
}
144152
}
145153

@@ -155,19 +163,65 @@ private String shorten(final String message) {
155163

156164
@PreDestroy
157165
public void cleanup() {
158-
sockets.forEach(s -> {
159-
logger.log(Level.INFO, "Disposing socket " + s.getId());
160-
s.dispose();
161-
});
166+
synchronized (sockets) {
167+
sockets.forEach(s -> {
168+
logger.log(Level.INFO, "Disposing socket " + s.getDescription());
169+
s.dispose();
170+
});
171+
}
162172
}
163173

164174
public void sendMessage(SaveAndRestoreWebSocketMessage webSocketMessage) {
165-
sockets.forEach(ws -> {
166-
try {
167-
ws.queueMessage(objectMapper.writeValueAsString(webSocketMessage));
168-
} catch (JsonProcessingException e) {
169-
throw new RuntimeException(e);
170-
}
171-
});
175+
synchronized (sockets) {
176+
sockets.forEach(ws -> {
177+
try {
178+
ws.queueMessage(objectMapper.writeValueAsString(webSocketMessage));
179+
} catch (JsonProcessingException e) {
180+
throw new RuntimeException(e);
181+
}
182+
});
183+
}
184+
}
185+
186+
/**
187+
* Sends a ping message to all clients contained in {@link #sockets}.
188+
* <p>
189+
* This is scheduled to run at the top of each hour, i.e. 00.00, 01.00...23.00
190+
* </p>
191+
*
192+
*/
193+
@SuppressWarnings("unused")
194+
@Scheduled(cron = "* 0 * * * *")
195+
public void pingClients(){
196+
synchronized (sockets) {
197+
sockets.forEach(WebSocket::sendPing);
198+
}
199+
}
200+
201+
/**
202+
* For each client in {@link #sockets}, checks the timestamp of last received pong message. If this is older
203+
* than 70 minutes, the socket is considered dead, and then disposed.
204+
* <p>
205+
* This is scheduled to run 5 minutes past each hour, i.e. 00.05, 01.05...23.05
206+
* </p>
207+
*
208+
*/
209+
@SuppressWarnings("unused")
210+
@Scheduled(cron = "* 5 * * * *")
211+
public void cleanUpDeadSockets(){
212+
List<WebSocket> deadSockets = new ArrayList<>();
213+
Instant now = Instant.now();
214+
synchronized (sockets) {
215+
sockets.forEach(s -> {
216+
Instant lastPinged = s.getLastPinged();
217+
if (lastPinged != null && lastPinged.isBefore(now.minus(70, ChronoUnit.MINUTES))) {
218+
deadSockets.add(s);
219+
}
220+
});
221+
deadSockets.forEach(d -> {
222+
sockets.remove(d);
223+
d.dispose();
224+
});
225+
}
172226
}
173227
}

0 commit comments

Comments
 (0)