3333import java .util .concurrent .ConcurrentMap ;
3434import java .util .concurrent .CopyOnWriteArrayList ;
3535import java .util .concurrent .CountDownLatch ;
36- import java .util .concurrent .TimeUnit ;
37- import java .util .concurrent .atomic .AtomicBoolean ;
3836
3937public class NettyWebSocket implements WebSocketConfigurer , WebSocket , ChannelFutureListener {
4038 /** All connected websocket. */
@@ -168,7 +166,7 @@ private void handleMessage(WebSocketFrame frame) {
168166 }
169167 WebSocketMessage message = WebSocketMessage .create (getContext (), array (content ));
170168
171- fireCallback (webSocketTask (() -> messageCallback .onMessage (this , message )));
169+ fireCallback (webSocketTask (() -> messageCallback .onMessage (this , message ), false ));
172170 } else {
173171 buffer = Unpooled .copiedBuffer (frame .content ());
174172 }
@@ -177,21 +175,13 @@ private void handleMessage(WebSocketFrame frame) {
177175 }
178176 }
179177
180- private void waitForConnect () {
181- try {
182- ready .await ();
183- } catch (InterruptedException x ) {
184- Thread .currentThread ().interrupt ();
185- }
186- }
187-
188178 private void handleClose (WebSocketCloseStatus closeStatus ) {
189179 try {
190180 if (isOpen ()) {
191181 if (onCloseCallback != null ) {
192182 Runnable closeCallback = () -> {
193183 try {
194- webSocketTask (() -> onCloseCallback .onClose (this , closeStatus )).run ();
184+ webSocketTask (() -> onCloseCallback .onClose (this , closeStatus ), false ).run ();
195185 } finally {
196186 netty .ctx .channel ()
197187 .writeAndFlush (
@@ -228,31 +218,16 @@ private void handleError(Throwable x) {
228218 }
229219
230220 void fireConnect () {
231- // fire only once
232221 addSession (this );
233222 if (connectCallback != null ) {
234223 fireCallback (webSocketTask (() -> {
235- try {
236- connectCallback .onConnect (this );
237- } finally {
238- ready .countDown ();
239- }
240- }));
224+ connectCallback .onConnect (this );
225+ }, true ));
241226 } else {
242227 ready .countDown ();
243228 }
244229 }
245230
246- private Runnable webSocketTask (Runnable runnable ) {
247- return () -> {
248- try {
249- runnable .run ();
250- } catch (Throwable x ) {
251- handleError (x );
252- }
253- };
254- }
255-
256231 private void fireCallback (Runnable task ) {
257232 if (dispatch ) {
258233 Router router = netty .getRouter ();
@@ -300,4 +275,25 @@ private void removeSession(NettyWebSocket ws) {
300275 }
301276 }
302277
278+ private Runnable webSocketTask (Runnable runnable , boolean isInit ) {
279+ return () -> {
280+ try {
281+ runnable .run ();
282+ } catch (Throwable x ) {
283+ handleError (x );
284+ } finally {
285+ if (isInit ) {
286+ ready .countDown ();
287+ }
288+ }
289+ };
290+ }
291+
292+ private void waitForConnect () {
293+ try {
294+ ready .await ();
295+ } catch (InterruptedException x ) {
296+ Thread .currentThread ().interrupt ();
297+ }
298+ }
303299}
0 commit comments