Skip to content

Commit 503dd4c

Browse files
committed
Ping/pong mechanism to handle connection issues in web socket
1 parent 939154f commit 503dd4c

File tree

1 file changed

+60
-28
lines changed

1 file changed

+60
-28
lines changed

core/websocket/src/main/java/org/phoebus/core/websocket/WebSocketClient.java

Lines changed: 60 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,21 @@
99
import java.net.http.WebSocket;
1010
import java.nio.ByteBuffer;
1111
import java.util.concurrent.CompletionStage;
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.TimeUnit;
1214
import java.util.concurrent.atomic.AtomicBoolean;
1315
import java.util.function.Consumer;
1416
import java.util.logging.Level;
1517
import java.util.logging.Logger;
1618

1719
/**
1820
* A web socket client implementation supporting pong and text messages.
21+
*
22+
* <p>
23+
* Once connection is established, a ping/pong thread is set up to check peer availability. This should be
24+
* able to handle both remote peer being shut down and network issues. Ping messages are dispatched once
25+
* per minute. A reconnection loop is started if a pong message is not received from peer within three seconds.
26+
* </p>
1927
*/
2028
public class WebSocketClient implements WebSocket.Listener {
2129

@@ -25,10 +33,12 @@ public class WebSocketClient implements WebSocket.Listener {
2533
private Runnable disconnectCallback;
2634
private final URI uri;
2735
private final Consumer<CharSequence> onTextCallback;
28-
private final AtomicBoolean attemptConnect = new AtomicBoolean(true);
36+
37+
private final AtomicBoolean attemptReconnect = new AtomicBoolean();
38+
private CountDownLatch pingCountdownLatch;
2939

3040
/**
31-
* @param uri The URI of the web socket peer.
41+
* @param uri The URI of the web socket peer.
3242
* @param onTextCallback A callback method the API client will use to process web socket messages.
3343
*/
3444
public WebSocketClient(URI uri, Consumer<CharSequence> onTextCallback) {
@@ -40,7 +50,6 @@ public WebSocketClient(URI uri, Consumer<CharSequence> onTextCallback) {
4050
* Attempts to connect to the remote web socket.
4151
*/
4252
public void connect() {
43-
attemptConnect.set(true);
4453
doConnect();
4554
}
4655

@@ -49,23 +58,18 @@ public void connect() {
4958
* connection is established.
5059
*/
5160
private void doConnect() {
61+
attemptReconnect.set(true);
5262
new Thread(() -> {
53-
while (attemptConnect.get()) {
63+
while (attemptReconnect.get()) {
5464
logger.log(Level.INFO, "Attempting web socket connection to " + uri);
55-
try {
56-
webSocket = HttpClient.newBuilder()
57-
.build()
58-
.newWebSocketBuilder()
59-
.buildAsync(uri, this)
60-
.join();
61-
break;
62-
} catch (Exception e) {
63-
logger.log(Level.INFO, "Failed to connect to web socket on " + uri, e);
64-
}
65+
HttpClient.newBuilder()
66+
.build()
67+
.newWebSocketBuilder()
68+
.buildAsync(uri, this);
6569
try {
6670
Thread.sleep(10000);
6771
} catch (InterruptedException e) {
68-
logger.log(Level.WARNING, "Interrupted while sleeping");
72+
logger.log(Level.WARNING, "Got interrupted exception");
6973
}
7074
}
7175
}).start();
@@ -74,16 +78,19 @@ private void doConnect() {
7478
/**
7579
* Called when connection has been established. An API client may optionally register a
7680
* {@link #connectCallback} which is called when connection is opened.
77-
* @param webSocket
78-
* the WebSocket that has been connected
81+
*
82+
* @param webSocket the WebSocket that has been connected
7983
*/
8084
@Override
8185
public void onOpen(WebSocket webSocket) {
8286
WebSocket.Listener.super.onOpen(webSocket);
87+
attemptReconnect.set(false);
88+
this.webSocket = webSocket;
8389
if (connectCallback != null) {
8490
connectCallback.run();
8591
}
8692
logger.log(Level.INFO, "Connected to " + uri);
93+
new Thread(new PingRunnable()).start();
8794
}
8895

8996
/**
@@ -106,10 +113,10 @@ public void sendText(String message) {
106113
* {@link #disconnectCallback} which is called when connection is opened.
107114
*
108115
* <p>
109-
* Note that reconnection will be attempted immediately.
116+
* Note that reconnection will be attempted immediately.
110117
* </p>
111-
* @param webSocket
112-
* the WebSocket that has been connected
118+
*
119+
* @param webSocket the WebSocket that has been connected
113120
*/
114121
@Override
115122
public CompletionStage<?> onClose(WebSocket webSocket,
@@ -119,7 +126,6 @@ public CompletionStage<?> onClose(WebSocket webSocket,
119126
if (disconnectCallback != null) {
120127
disconnectCallback.run();
121128
}
122-
doConnect();
123129
return null;
124130
}
125131

@@ -128,13 +134,14 @@ public CompletionStage<?> onClose(WebSocket webSocket,
128134
* is called.
129135
*/
130136
public void sendPing() {
131-
logger.log(Level.INFO, "Sending ping");
137+
logger.log(Level.FINE, "Sending ping");
132138
webSocket.sendPing(ByteBuffer.allocate(0));
133139
}
134140

135141
@Override
136142
public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
137-
logger.log(Level.INFO, "Got pong");
143+
pingCountdownLatch.countDown();
144+
logger.log(Level.FINE, "Got pong");
138145
return WebSocket.Listener.super.onPong(webSocket, message);
139146
}
140147

@@ -156,19 +163,18 @@ public CompletionStage<?> onText(WebSocket webSocket,
156163
}
157164

158165
/**
159-
*
160166
* <b>NOTE:</b> this <b>must</b> be called by the API client when web socket messages are no longer
161167
* needed, otherwise reconnect attempts will continue as these run on a separate thread.
162168
*
163169
* <p>
164-
* The status code 1000 is used when calling the {@link WebSocket#sendClose(int, String)} method. See
165-
* list of common web socket status codes
166-
* <a href='https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent/code'>here</a>.
170+
* The status code 1000 is used when calling the {@link WebSocket#sendClose(int, String)} method. See
171+
* list of common web socket status codes
172+
* <a href='https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent/code'>here</a>.
167173
* </p>
174+
*
168175
* @param reason Custom reason text.
169176
*/
170177
public void close(String reason) {
171-
attemptConnect.set(false);
172178
webSocket.sendClose(1000, reason);
173179
}
174180

@@ -186,4 +192,30 @@ public void setConnectCallback(Runnable connectCallback) {
186192
public void setDisconnectCallback(Runnable disconnectCallback) {
187193
this.disconnectCallback = disconnectCallback;
188194
}
195+
196+
private class PingRunnable implements Runnable {
197+
198+
@Override
199+
public void run() {
200+
while (true) {
201+
pingCountdownLatch = new CountDownLatch(1);
202+
sendPing();
203+
try {
204+
if (!pingCountdownLatch.await(3, TimeUnit.SECONDS)) {
205+
if (disconnectCallback != null) {
206+
disconnectCallback.run();
207+
}
208+
logger.log(Level.WARNING, "No pong response within three seconds");
209+
doConnect();
210+
return;
211+
} else {
212+
Thread.sleep(60000);
213+
}
214+
} catch (InterruptedException e) {
215+
logger.log(Level.WARNING, "Got interrupted exception");
216+
return;
217+
}
218+
}
219+
}
220+
}
189221
}

0 commit comments

Comments
 (0)