22
33import lombok .SneakyThrows ;
44import org .java_websocket .client .WebSocketClient ;
5+ import org .java_websocket .framing .CloseFrame ;
56import org .java_websocket .handshake .ServerHandshake ;
67
78import java .net .URI ;
1314public class BlockingJavaWebSocketClient extends WebSocketClient {
1415
1516 private CountDownLatch connectLatch = new CountDownLatch (1 );
16- private CountDownLatch responseLatch = new CountDownLatch ( 1 ) ;
17+ private volatile CountDownLatch responseLatch ;
1718 private final AtomicReference <String > responseMessage = new AtomicReference <>();
1819 private final AtomicReference <byte []> responseBytesMessage = new AtomicReference <>();
1920 private volatile boolean connected = false ;
@@ -24,90 +25,137 @@ public BlockingJavaWebSocketClient(URI serverUri) {
2425
2526 @ Override
2627 public void onOpen (ServerHandshake handshake ) {
28+ System .out .println ("连接成功" );
2729 connected = true ;
2830 connectLatch .countDown ();
2931 }
3032
31- @ Override
3233 public void onMessage (String message ) {
34+ System .out .println ("收到消息: " + message );
3335 responseMessage .set (message );
34- responseLatch .countDown ();
36+ if (responseLatch != null ) {
37+ responseLatch .countDown ();
38+ }
3539 close ();
3640 }
3741
38- @ Override
3942 public void onMessage (ByteBuffer byteBuffer ) {
43+ System .out .println ("收到字节消息: " + byteBuffer );
4044 responseBytesMessage .set (byteBuffer .array ());
41- responseLatch .countDown ();
45+ if (responseLatch != null ) {
46+ responseLatch .countDown ();
47+ }
4248 close ();
4349 }
4450
45- @ Override
4651 public void onClose (int code , String reason , boolean remote ) {
47- responseLatch .countDown ();
48- connectLatch .countDown ();
52+ System .out .println ("连接关闭: " + code + " - " + reason );
4953 connected = false ;
54+ // Signal any waiting threads
55+ if (responseLatch != null ) {
56+ responseLatch .countDown ();
57+ }
58+ connectLatch .countDown ();
5059 }
5160
52- @ Override
5361 public void onError (Exception ex ) {
54- responseLatch .countDown ();
55- connectLatch .countDown ();
62+ System .out .println ("连接错误: " + ex .getMessage ());
5663 connected = false ;
57- }
58-
59- @ SneakyThrows
60- public static String sendRequestWaitResponse (String entrypoint , String message ) {
61- BlockingJavaWebSocketClient blockingJavaWebSocketClient = new BlockingJavaWebSocketClient (URI .create (entrypoint ));
62- return blockingJavaWebSocketClient .sendRequest (message );
63- }
64-
65- @ SneakyThrows
66- public static byte [] sendRequestWaitResponse (String entrypoint , ByteBuffer message ) {
67- BlockingJavaWebSocketClient blockingJavaWebSocketClient = new BlockingJavaWebSocketClient (URI .create (entrypoint ));
68- return blockingJavaWebSocketClient .sendRequest (message );
64+ // Signal any waiting threads
65+ if (responseLatch != null ) {
66+ responseLatch .countDown ();
67+ }
68+ connectLatch .countDown ();
69+ ex .printStackTrace ();
6970 }
7071
7172 public String sendRequest (String message ) throws InterruptedException {
72- connect ();
73- if (!connectLatch .await (5 , TimeUnit .SECONDS )) {
74- throw new InterruptedException ("Timeout during WebSocket connection." );
73+ // Connect if not already connected
74+ if (!connected && !isOpen ()) {
75+ connect ();
76+ if (!connectLatch .await (5 , TimeUnit .SECONDS )) {
77+ throw new InterruptedException ("Timeout during WebSocket connection." );
78+ }
7579 }
76- if (!connected ) {
80+
81+ if (!connected || !isOpen ()) {
7782 throw new IllegalStateException ("WebSocket connection is not open." );
7883 }
7984
85+ // Reset response data and create new response latch for this request
8086 responseMessage .set (null );
81- connectLatch = new CountDownLatch ( 1 );
87+ responseBytesMessage . set ( null );
8288 responseLatch = new CountDownLatch (1 );
89+
90+ // Send the message
8391 send (message );
8492
85- if (!responseLatch .await (5 , TimeUnit .SECONDS )) {
93+ // Wait for response
94+ if (!responseLatch .await (10 , TimeUnit .SECONDS )) {
8695 throw new InterruptedException ("Timeout waiting for WebSocket response." );
8796 }
97+
98+ // Check if connection was closed during wait
99+ if (!connected ) {
100+ throw new IllegalStateException ("WebSocket connection was closed while waiting for response." );
101+ }
102+
88103 return responseMessage .get ();
89104 }
90105
91106 public byte [] sendRequest (ByteBuffer message ) throws InterruptedException {
92- connect ();
93- if (!connectLatch .await (5 , TimeUnit .SECONDS )) {
94- throw new InterruptedException ("Timeout during WebSocket connection." );
107+ // Connect if not already connected
108+ if (!connected && !isOpen ()) {
109+ connect ();
110+ if (!connectLatch .await (5 , TimeUnit .SECONDS )) {
111+ throw new InterruptedException ("Timeout during WebSocket connection." );
112+ }
95113 }
96- if (!connected ) {
114+
115+ if (!connected || !isOpen ()) {
97116 throw new IllegalStateException ("WebSocket connection is not open." );
98117 }
99118
119+ // Reset response data and create new response latch for this request
120+ responseMessage .set (null );
100121 responseBytesMessage .set (null );
101- connectLatch = new CountDownLatch (1 );
102122 responseLatch = new CountDownLatch (1 );
123+
124+ // Send the message
103125 send (message );
104126
105- if (!responseLatch .await (5 , TimeUnit .SECONDS )) {
127+ // Wait for response
128+ if (!responseLatch .await (10 , TimeUnit .SECONDS )) {
106129 throw new InterruptedException ("Timeout waiting for WebSocket response." );
107130 }
131+
132+ // Check if connection was closed during wait
133+ if (!connected ) {
134+ throw new IllegalStateException ("WebSocket connection was closed while waiting for response." );
135+ }
136+
108137 return responseBytesMessage .get ();
109138 }
110139
140+ public void disconnect () {
141+ if (connected && isOpen ()) {
142+ close ();
143+ }
144+ }
145+
146+
147+ @ SneakyThrows
148+ public static String sendRequestWaitResponse (String entrypoint , String message ) {
149+ BlockingJavaWebSocketClient blockingJavaWebSocketClient = new BlockingJavaWebSocketClient (URI .create (entrypoint ));
150+ return blockingJavaWebSocketClient .sendRequest (message );
151+ }
152+
153+ @ SneakyThrows
154+ public static byte [] sendRequestWaitResponse (String entrypoint , ByteBuffer message ) {
155+ BlockingJavaWebSocketClient blockingJavaWebSocketClient = new BlockingJavaWebSocketClient (URI .create (entrypoint ));
156+ return blockingJavaWebSocketClient .sendRequest (message );
157+ }
158+
111159 public static void main (String [] args ) {
112160 String uri = "ws://localhost:8082/app/fuck" ;
113161 System .out .println ("Response 1: " + BlockingJavaWebSocketClient .sendRequestWaitResponse (uri , "id" ));
0 commit comments