22
33import io .socket .backo .Backoff ;
44import io .socket .emitter .Emitter ;
5+ import io .socket .parser .DecodingException ;
56import io .socket .parser .IOParser ;
67import io .socket .parser .Packet ;
78import io .socket .parser .Parser ;
1011import okhttp3 .WebSocket ;
1112
1213import java .net .URI ;
13- import java .util .ArrayList ;
14- import java .util .Date ;
15- import java .util .HashSet ;
16- import java .util .LinkedList ;
17- import java .util .List ;
18- import java .util .Map ;
19- import java .util .Queue ;
20- import java .util .Set ;
21- import java .util .Timer ;
22- import java .util .TimerTask ;
14+ import java .util .*;
2315import java .util .concurrent .ConcurrentHashMap ;
2416import java .util .logging .Level ;
2517import java .util .logging .Logger ;
@@ -48,16 +40,6 @@ public class Manager extends Emitter {
4840 public static final String EVENT_PACKET = "packet" ;
4941 public static final String EVENT_ERROR = "error" ;
5042
51- /**
52- * Called on a connection error.
53- */
54- public static final String EVENT_CONNECT_ERROR = "connect_error" ;
55-
56- /**
57- * Called on a connection timeout.
58- */
59- public static final String EVENT_CONNECT_TIMEOUT = "connect_timeout" ;
60-
6143 /**
6244 * Called on a successful reconnection.
6345 */
@@ -72,12 +54,6 @@ public class Manager extends Emitter {
7254
7355 public static final String EVENT_RECONNECT_ATTEMPT = "reconnect_attempt" ;
7456
75- public static final String EVENT_RECONNECTING = "reconnecting" ;
76-
77- public static final String EVENT_PING = "ping" ;
78-
79- public static final String EVENT_PONG = "pong" ;
80-
8157 /**
8258 * Called when a new transport is created. (experimental)
8359 */
@@ -98,8 +74,6 @@ public class Manager extends Emitter {
9874 private double _randomizationFactor ;
9975 private Backoff backoff ;
10076 private long _timeout ;
101- private Set <Socket > connecting = new HashSet <Socket >();
102- private Date lastPing ;
10377 private URI uri ;
10478 private List <Packet > packetBuffer ;
10579 private Queue <On .Handle > subs ;
@@ -160,28 +134,6 @@ public Manager(URI uri, Options opts) {
160134 this .decoder = opts .decoder != null ? opts .decoder : new IOParser .Decoder ();
161135 }
162136
163- private void emitAll (String event , Object ... args ) {
164- this .emit (event , args );
165- for (Socket socket : this .nsps .values ()) {
166- socket .emit (event , args );
167- }
168- }
169-
170- /**
171- * Update `socket.id` of all sockets
172- */
173- private void updateSocketIds () {
174- for (Map .Entry <String , Socket > entry : this .nsps .entrySet ()) {
175- String nsp = entry .getKey ();
176- Socket socket = entry .getValue ();
177- socket .id = this .generateId (nsp );
178- }
179- }
180-
181- private String generateId (String nsp ) {
182- return ("/" .equals (nsp ) ? "" : (nsp + "#" )) + this .engine .id ();
183- }
184-
185137 public boolean reconnection () {
186138 return this ._reconnection ;
187139 }
@@ -307,7 +259,7 @@ public void call(Object... objects) {
307259 logger .fine ("connect_error" );
308260 self .cleanup ();
309261 self .readyState = ReadyState .CLOSED ;
310- self .emitAll ( EVENT_CONNECT_ERROR , data );
262+ self .emit ( EVENT_ERROR , data );
311263 if (fn != null ) {
312264 Exception err = new SocketIOException ("Connection error" ,
313265 data instanceof Exception ? (Exception ) data : null );
@@ -334,7 +286,6 @@ public void run() {
334286 openSub .destroy ();
335287 socket .close ();
336288 socket .emit (Engine .EVENT_ERROR , new SocketIOException ("timeout" ));
337- self .emitAll (EVENT_CONNECT_TIMEOUT , timeout );
338289 }
339290 });
340291 }
@@ -377,18 +328,6 @@ public void call(Object... objects) {
377328 }
378329 }
379330 }));
380- this .subs .add (On .on (socket , Engine .EVENT_PING , new Listener () {
381- @ Override
382- public void call (Object ... objects ) {
383- Manager .this .onping ();
384- }
385- }));
386- this .subs .add (On .on (socket , Engine .EVENT_PONG , new Listener () {
387- @ Override
388- public void call (Object ... objects ) {
389- Manager .this .onpong ();
390- }
391- }));
392331 this .subs .add (On .on (socket , Engine .EVENT_ERROR , new Listener () {
393332 @ Override
394333 public void call (Object ... objects ) {
@@ -409,22 +348,20 @@ public void call (Packet packet) {
409348 });
410349 }
411350
412- private void onping () {
413- this .lastPing = new Date ();
414- this .emitAll (EVENT_PING );
415- }
416-
417- private void onpong () {
418- this .emitAll (EVENT_PONG ,
419- null != this .lastPing ? new Date ().getTime () - this .lastPing .getTime () : 0 );
420- }
421-
422351 private void ondata (String data ) {
423- this .decoder .add (data );
352+ try {
353+ this .decoder .add (data );
354+ } catch (DecodingException e ) {
355+ this .onerror (e );
356+ }
424357 }
425358
426359 private void ondata (byte [] data ) {
427- this .decoder .add (data );
360+ try {
361+ this .decoder .add (data );
362+ } catch (DecodingException e ) {
363+ this .onerror (e );
364+ }
428365 }
429366
430367 private void ondecoded (Packet packet ) {
@@ -433,7 +370,7 @@ private void ondecoded(Packet packet) {
433370
434371 private void onerror (Exception err ) {
435372 logger .log (Level .FINE , "error" , err );
436- this .emitAll (EVENT_ERROR , err );
373+ this .emit (EVENT_ERROR , err );
437374 }
438375
439376 /**
@@ -444,41 +381,31 @@ private void onerror(Exception err) {
444381 * @return a socket instance for the namespace.
445382 */
446383 public Socket socket (final String nsp , Options opts ) {
447- Socket socket = this .nsps .get (nsp );
448- if (socket == null ) {
449- socket = new Socket (this , nsp , opts );
450- Socket _socket = this .nsps .putIfAbsent (nsp , socket );
451- if (_socket != null ) {
452- socket = _socket ;
453- } else {
454- final Manager self = this ;
455- final Socket s = socket ;
456- socket .on (Socket .EVENT_CONNECTING , new Listener () {
457- @ Override
458- public void call (Object ... args ) {
459- self .connecting .add (s );
460- }
461- });
462- socket .on (Socket .EVENT_CONNECT , new Listener () {
463- @ Override
464- public void call (Object ... objects ) {
465- s .id = self .generateId (nsp );
466- }
467- });
384+ synchronized (this .nsps ) {
385+ Socket socket = this .nsps .get (nsp );
386+ if (socket == null ) {
387+ socket = new Socket (this , nsp , opts );
388+ this .nsps .put (nsp , socket );
468389 }
390+ return socket ;
469391 }
470- return socket ;
471392 }
472393
473394 public Socket socket (String nsp ) {
474395 return socket (nsp , null );
475396 }
476397
477- /*package*/ void destroy (Socket socket ) {
478- this .connecting .remove (socket );
479- if (!this .connecting .isEmpty ()) return ;
398+ /*package*/ void destroy () {
399+ synchronized (this .nsps ) {
400+ for (Socket socket : this .nsps .values ()) {
401+ if (socket .isActive ()) {
402+ logger .fine ("socket is still active, skipping close" );
403+ return ;
404+ }
405+ }
480406
481- this .close ();
407+ this .close ();
408+ }
482409 }
483410
484411 /*package*/ void packet (Packet packet ) {
@@ -487,10 +414,6 @@ public Socket socket(String nsp) {
487414 }
488415 final Manager self = this ;
489416
490- if (packet .query != null && !packet .query .isEmpty () && packet .type == Parser .CONNECT ) {
491- packet .nsp += "?" + packet .query ;
492- }
493-
494417 if (!self .encoding ) {
495418 self .encoding = true ;
496419 this .encoder .encode (packet , new Parser .Encoder .Callback () {
@@ -528,7 +451,6 @@ private void cleanup() {
528451
529452 this .packetBuffer .clear ();
530453 this .encoding = false ;
531- this .lastPing = null ;
532454
533455 this .decoder .destroy ();
534456 }
@@ -569,7 +491,7 @@ private void reconnect() {
569491 if (this .backoff .getAttempts () >= this ._reconnectionAttempts ) {
570492 logger .fine ("reconnect failed" );
571493 this .backoff .reset ();
572- this .emitAll (EVENT_RECONNECT_FAILED );
494+ this .emit (EVENT_RECONNECT_FAILED );
573495 this .reconnecting = false ;
574496 } else {
575497 long delay = this .backoff .duration ();
@@ -587,8 +509,7 @@ public void run() {
587509
588510 logger .fine ("attempting reconnect" );
589511 int attempts = self .backoff .getAttempts ();
590- self .emitAll (EVENT_RECONNECT_ATTEMPT , attempts );
591- self .emitAll (EVENT_RECONNECTING , attempts );
512+ self .emit (EVENT_RECONNECT_ATTEMPT , attempts );
592513
593514 // check again for the case socket closed in above events
594515 if (self .skipReconnect ) return ;
@@ -600,7 +521,7 @@ public void call(Exception err) {
600521 logger .fine ("reconnect attempt error" );
601522 self .reconnecting = false ;
602523 self .reconnect ();
603- self .emitAll (EVENT_RECONNECT_ERROR , err );
524+ self .emit (EVENT_RECONNECT_ERROR , err );
604525 } else {
605526 logger .fine ("reconnect success" );
606527 self .onreconnect ();
@@ -625,8 +546,7 @@ private void onreconnect() {
625546 int attempts = this .backoff .getAttempts ();
626547 this .reconnecting = false ;
627548 this .backoff .reset ();
628- this .updateSocketIds ();
629- this .emitAll (EVENT_RECONNECT , attempts );
549+ this .emit (EVENT_RECONNECT , attempts );
630550 }
631551
632552
@@ -652,6 +572,7 @@ public static class Options extends io.socket.engineio.client.Socket.Options {
652572 public double randomizationFactor ;
653573 public Parser .Encoder encoder ;
654574 public Parser .Decoder decoder ;
575+ public Map <String , String > auth ;
655576
656577 /**
657578 * Connection timeout (ms). Set -1 to disable.
0 commit comments