4646 * <li>Publish a topic named /path/web-socket/messages, where path is optional.</li>
4747 * </ul>
4848 * </p>
49+ * <p>
50+ * <b>NOTE:</b> client code <i>must</i> call the {@link #shutdown()} method to not
51+ * leave the web socket connection alive.
52+ * </p>
4953 */
5054public class WebSocketClientService {
5155
@@ -122,26 +126,18 @@ public void sendEcho(String message) {
122126 if (stompSession != null && stompSession .isConnected () && echoEndpoint != null ) {
123127 stompSession .send (echoEndpoint , message );
124128 }
125-
126129 }
127130
128131 /**
129- * Disconnects the STOMP session if non-null and connected .
132+ * Disconnects the socket if connected and terminates connection thread .
130133 */
131- public void disconnect () {
134+ public synchronized void shutdown () {
135+ attemptReconnect .set (false );
132136 if (stompSession != null && stompSession .isConnected ()) {
133137 stompSession .disconnect ();
134138 }
135139 }
136140
137- /**
138- * Disconnects the socket if connected and terminates connection thread.
139- */
140- public void shutdown () {
141- attemptReconnect .set (false );
142- disconnect ();
143- }
144-
145141 /**
146142 * Attempts to connect to the remote peer, both in initial connection and in a reconnection scenario.
147143 * If connection fails, new attempts are made every 10s until successful.
@@ -156,23 +152,25 @@ public void connect() {
156152 stompClient .setTaskScheduler (threadPoolTaskScheduler );
157153 stompClient .setDefaultHeartbeat (new long []{30000 , 30000 });
158154 StompSessionHandler sessionHandler = new StompSessionHandler ();
155+ logger .log (Level .INFO , "Attempting web socket connection to " + connectUrl );
159156 new Thread (() -> {
160157 while (attemptReconnect .get ()) {
161- logger .log (Level .INFO , "Attempting web socket connection to " + connectUrl );
162158 try {
163- stompSession = stompClient .connect (connectUrl , sessionHandler ).get ();
164- stompSession .subscribe (this .subscriptionEndpoint , new StompFrameHandler () {
165- @ Override
166- public Type getPayloadType (StompHeaders headers ) {
167- return String .class ;
168- }
159+ synchronized (WebSocketClientService .this ) {
160+ stompSession = stompClient .connect (connectUrl , sessionHandler ).get ();
161+ stompSession .subscribe (this .subscriptionEndpoint , new StompFrameHandler () {
162+ @ Override
163+ public Type getPayloadType (StompHeaders headers ) {
164+ return String .class ;
165+ }
169166
170- @ Override
171- public void handleFrame (StompHeaders headers , Object payload ) {
172- logger .log (Level .INFO , "Handling subscription frame: " + payload );
173- webSocketMessageHandlers .forEach (h -> h .handleWebSocketMessage ((String ) payload ));
174- }
175- });
167+ @ Override
168+ public void handleFrame (StompHeaders headers , Object payload ) {
169+ logger .log (Level .INFO , "Handling subscription frame: " + payload );
170+ webSocketMessageHandlers .forEach (h -> h .handleWebSocketMessage ((String ) payload ));
171+ }
172+ });
173+ }
176174 } catch (Exception e ) {
177175 logger .log (Level .WARNING , "Got exception when trying to connect" , e );
178176 }
0 commit comments