Skip to content

Commit 1121d12

Browse files
committed
Periodic check of web socket client connection
1 parent 543d6be commit 1121d12

File tree

2 files changed

+113
-36
lines changed

2 files changed

+113
-36
lines changed

services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/websocket/WebSocket.java

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,20 @@
88

99
import com.fasterxml.jackson.databind.JsonNode;
1010
import com.fasterxml.jackson.databind.ObjectMapper;
11+
import org.springframework.web.socket.PingMessage;
1112
import org.springframework.web.socket.TextMessage;
1213
import org.springframework.web.socket.WebSocketSession;
1314

15+
import java.io.IOException;
16+
import java.net.InetSocketAddress;
17+
import java.time.Instant;
1418
import java.util.concurrent.ArrayBlockingQueue;
1519
import java.util.concurrent.atomic.AtomicBoolean;
1620
import java.util.logging.Level;
1721
import java.util.logging.Logger;
1822

1923
/**
20-
* Utility class for handling web socket messages. In the context of the save-and-restore service,
21-
* only messages from server are expected. Client messages are logged, but do not invoke any behavior.
24+
* Utility class for handling web socket messages.
2225
*/
2326
@SuppressWarnings("nls")
2427
public class WebSocket {
@@ -41,11 +44,16 @@ public class WebSocket {
4144

4245
private final WebSocketSession session;
4346
private final String id;
44-
47+
private final String description;
4548
private final Logger logger = Logger.getLogger(WebSocket.class.getName());
46-
4749
private final ObjectMapper objectMapper;
4850

51+
/**
52+
* Keeps track of when this session was used for a ping/pong exchange. Should be set to non-null value ONLY
53+
* when an actual pong was received by {@link WebSocketHandler}.
54+
*/
55+
private Instant lastPinged;
56+
4957
/**
5058
* Constructor
5159
*/
@@ -58,6 +66,8 @@ public WebSocket(ObjectMapper objectMapper, WebSocketSession webSocketSession) {
5866
writeThread.setName("Web Socket Write Thread " + this.id);
5967
writeThread.setDaemon(true);
6068
writeThread.start();
69+
InetSocketAddress inetSocketAddress = webSocketSession.getRemoteAddress();
70+
this.description = this.id + "/" + (inetSocketAddress != null ? inetSocketAddress.getAddress().toString() : "IP address unknown");
6171
}
6272

6373
/**
@@ -70,6 +80,14 @@ public String getId() {
7080
return id;
7181
}
7282

83+
/**
84+
*
85+
* @return A description containing the session ID and - if available - the associated IP address.
86+
*/
87+
public String getDescription() {
88+
return description;
89+
}
90+
7391
/**
7492
* @param message Potentially long message
7593
* @return Message shorted to 200 chars
@@ -137,7 +155,7 @@ private void writeQueuedMessages() {
137155
}
138156

139157
/**
140-
* Called when client sends a general message
158+
* Called when client sends a generic message
141159
*
142160
* @param message {@link TextMessage}, its payload is expected to be JSON.
143161
*/
@@ -150,12 +168,6 @@ public void handleTextMessage(TextMessage message) throws Exception {
150168
logger.log(Level.INFO, "Client message type: " + type);
151169
}
152170

153-
/**
154-
* Clears all PVs
155-
*
156-
* <p>Web socket calls this onClose(),
157-
* but context may also call this again just in case
158-
*/
159171
public void dispose() {
160172
// Exit write thread
161173
try {
@@ -166,8 +178,35 @@ public void dispose() {
166178
// TODO: is this needed?
167179
session.close();
168180
} catch (Throwable ex) {
169-
logger.log(Level.WARNING, "Error disposing " + getId(), ex);
181+
logger.log(Level.WARNING, "Error disposing " + description, ex);
182+
}
183+
logger.log(Level.INFO, () -> "Web socket " + description + " closed");
184+
}
185+
186+
/**
187+
* Sets the time of last received pong message.
188+
* @param instant Time of last received pong message.
189+
*/
190+
public void setLastPinged(Instant instant) {
191+
this.lastPinged = instant;
192+
}
193+
194+
/**
195+
*
196+
* @return The time of last received pong message.
197+
*/
198+
public Instant getLastPinged() {
199+
return lastPinged;
200+
}
201+
202+
/**
203+
* Sends a {@link PingMessage} to peer.
204+
*/
205+
public void sendPing() {
206+
try {
207+
session.sendMessage(new PingMessage());
208+
} catch (IOException e) {
209+
logger.log(Level.WARNING, "Failed to send ping message", e);
170210
}
171-
logger.log(Level.INFO, () -> "Web socket " + session.getId() + " closed");
172211
}
173212
}

services/save-and-restore/src/main/java/org/phoebus/service/saveandrestore/websocket/WebSocketHandler.java

Lines changed: 61 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,5 @@
11
/*
2-
* Copyright (C) 2023 European Spallation Source ERIC.
3-
*
4-
* This program is free software; you can redistribute it and/or
5-
* modify it under the terms of the GNU General Public License
6-
* as published by the Free Software Foundation; either version 2
7-
* of the License, or (at your option) any later version.
8-
*
9-
* This program is distributed in the hope that it will be useful,
10-
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11-
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12-
* GNU General Public License for more details.
13-
*
14-
* You should have received a copy of the GNU General Public License
15-
* along with this program; if not, write to the Free Software
16-
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
2+
* Copyright (C) 2025 European Spallation Source ERIC.
173
*
184
*/
195

@@ -24,6 +10,7 @@
2410
import org.phoebus.applications.saveandrestore.model.websocket.SaveAndRestoreWebSocketMessage;
2511
import org.springframework.beans.factory.annotation.Autowired;
2612
import org.springframework.lang.NonNull;
13+
import org.springframework.scheduling.annotation.Scheduled;
2714
import org.springframework.stereotype.Component;
2815
import org.springframework.web.socket.CloseStatus;
2916
import org.springframework.web.socket.PongMessage;
@@ -33,14 +20,27 @@
3320

3421
import javax.annotation.PreDestroy;
3522
import java.io.EOFException;
23+
import java.net.InetSocketAddress;
24+
import java.time.Instant;
25+
import java.time.temporal.ChronoUnit;
3626
import java.util.ArrayList;
27+
import java.util.Collections;
3728
import java.util.List;
3829
import java.util.Optional;
3930
import java.util.logging.Level;
4031
import java.util.logging.Logger;
4132

4233
/**
4334
* Single web socket end-point routing messages to active {@link WebSocket} instances.
35+
*
36+
* <p>
37+
* In some cases web socket clients may become stale/disconnected for various reasons, e.g. network issues. The
38+
* {@link #afterConnectionClosed(WebSocketSession, CloseStatus)} is not necessarily called in those case.
39+
* To make sure the {@link #sockets} collection does not contain stale clients, a scheduled job runs once per hour to
40+
* ping all clients, and set the time when the pong response was received. Another scheduled job will check
41+
* the last received pong message timestamp and - if older than 70 minutes - consider the client session dead
42+
* and dispose of it.
43+
* </p>
4444
*/
4545
@Component
4646
public class WebSocketHandler extends TextWebSocketHandler {
@@ -49,7 +49,7 @@ public class WebSocketHandler extends TextWebSocketHandler {
4949
* List of active {@link WebSocket}
5050
*/
5151
@SuppressWarnings("unused")
52-
private List<WebSocket> sockets = new ArrayList<>();
52+
private List<WebSocket> sockets = Collections.synchronizedList(new ArrayList<>());
5353

5454
@SuppressWarnings("unused")
5555
@Autowired
@@ -87,7 +87,8 @@ public void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMe
8787
*/
8888
@Override
8989
public void afterConnectionEstablished(@NonNull WebSocketSession session) {
90-
logger.log(Level.INFO, "Opening web socket session from remote " + session.getRemoteAddress().getAddress());
90+
InetSocketAddress inetSocketAddress = session.getRemoteAddress();
91+
logger.log(Level.INFO, "Opening web socket session from remote " + (inetSocketAddress != null ? inetSocketAddress.getAddress().toString() : "<unknown IP address>"));
9192
WebSocket webSocket = new WebSocket(objectMapper, session);
9293
sockets.add(webSocket);
9394
}
@@ -104,7 +105,7 @@ public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull Cl
104105
Optional<WebSocket> webSocketOptional =
105106
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
106107
if (webSocketOptional.isPresent()) {
107-
logger.log(Level.INFO, "Closing web socket session from remote " + session.getRemoteAddress().getAddress());
108+
logger.log(Level.INFO, "Closing web socket session " + webSocketOptional.get().getDescription());
108109
webSocketOptional.get().dispose();
109110
sockets.remove(webSocketOptional.get());
110111
}
@@ -126,20 +127,20 @@ public void handleTransportError(@NonNull WebSocketSession session, @NonNull Thr
126127
}
127128

128129
/**
129-
* Called when client sends ping message, i.e. a pong message is sent and time for last message
130+
* Called when client sends ping message, i.e. a pong message is sent and time for last pong response message
130131
* in the {@link WebSocket} instance is refreshed.
131132
*
132133
* @param session Associated {@link WebSocketSession}
133134
* @param message See {@link PongMessage}
134135
*/
135136
@Override
136137
protected void handlePongMessage(@NonNull WebSocketSession session, @NonNull PongMessage message) {
137-
logger.log(Level.INFO, "Got pong");
138+
logger.log(Level.FINE, "Got pong for session " + session.getId());
138139
// Find the WebSocket instance associated with this WebSocketSession
139140
Optional<WebSocket> webSocketOptional =
140141
sockets.stream().filter(webSocket -> webSocket.getId().equals(session.getId())).findFirst();
141-
if (webSocketOptional.isEmpty()) {
142-
return; // Should only happen in case of timing issues?
142+
if (webSocketOptional.isPresent()) {
143+
webSocketOptional.get().setLastPinged(Instant.now());
143144
}
144145
}
145146

@@ -156,7 +157,7 @@ private String shorten(final String message) {
156157
@PreDestroy
157158
public void cleanup() {
158159
sockets.forEach(s -> {
159-
logger.log(Level.INFO, "Disposing socket " + s.getId());
160+
logger.log(Level.INFO, "Disposing socket " + s.getDescription());
160161
s.dispose();
161162
});
162163
}
@@ -170,4 +171,41 @@ public void sendMessage(SaveAndRestoreWebSocketMessage webSocketMessage) {
170171
}
171172
});
172173
}
174+
175+
/**
176+
* Sends a ping message to all clients contained in {@link #sockets}.
177+
* <p>
178+
* This is scheduled to run at the top of each hour, i.e. 00.00, 01.00...23.00
179+
* </p>
180+
*
181+
*/
182+
@SuppressWarnings("unused")
183+
@Scheduled(cron = "* 0 * * * *")
184+
public void pingClients(){
185+
sockets.forEach(WebSocket::sendPing);
186+
}
187+
188+
/**
189+
* For each client in {@link #sockets}, checks the timestamp of last received pong message. If this is older
190+
* than 70 minutes, the socket is considered dead, and then disposed.
191+
* <p>
192+
* This is scheduled to run 5 minutes past each hour, i.e. 00.05, 01.05...23.05
193+
* </p>
194+
*
195+
*/
196+
@SuppressWarnings("unused")
197+
@Scheduled(cron = "* 5 * * * *")
198+
public void cleanUpDeadSockets(){
199+
List<WebSocket> deadSockets = new ArrayList<>();
200+
Instant now = Instant.now();
201+
sockets.forEach(s -> {
202+
if(s.getLastPinged() != null && s.getLastPinged().isBefore(now.minus(70, ChronoUnit.MINUTES))){
203+
deadSockets.add(s);
204+
}
205+
});
206+
deadSockets.forEach(d -> {
207+
sockets.remove(d);
208+
d.dispose();
209+
});
210+
}
173211
}

0 commit comments

Comments
 (0)