@@ -124,8 +124,8 @@ Client.prototype.connect = function() {
124
124
return this ;
125
125
}
126
126
this . token = new Token ( this . config )
127
- . on ( 'readable ' , this . onTokenReadable . bind ( this ) )
128
- . on ( 'error' , this . onTokenError . bind ( this ) ) ;
127
+ . on ( 'data ' , this . onTokenReadable . bind ( this ) )
128
+ . on ( 'error' , this . onError . bind ( this ) ) ;
129
129
return this ;
130
130
} ;
131
131
@@ -558,7 +558,7 @@ Client.prototype.read = function(path, params, fn) {
558
558
559
559
if ( ! this . readable ) {
560
560
debug ( '%s not readable. queueing read' , this . _id , path , params ) ;
561
- return this . on ( 'readable' , this . read . bind ( this , path , params , fn ) ) ;
561
+ return this . once ( 'readable' , this . read . bind ( this , path , params , fn ) ) ;
562
562
}
563
563
564
564
if ( typeof params === 'function' ) {
@@ -618,14 +618,14 @@ Client.prototype.ender = function(fn) {
618
618
* @api private
619
619
*/
620
620
Client . prototype . onTokenReadable = function ( ) {
621
- debug ( 'token now readable' ) ;
622
- this . readable = true ;
623
- this . context . set ( this . token . session ) ;
624
621
if ( ! this . io ) {
625
622
this . _initSocketio ( ) ;
626
623
}
627
- this . emit ( 'readable' , this . token . session ) ;
628
- debug ( 'emitted readable on client' ) ;
624
+ debug ( 'authenticating with scout-server socket.io transport...' ) ;
625
+ this . io . emit ( 'authenticate' , {
626
+ token : this . token . toString ( )
627
+ } ) ;
628
+
629
629
630
630
if ( ! this . original ) {
631
631
this . emit ( 'change' ) ;
@@ -642,30 +642,27 @@ Client.prototype.onTokenReadable = function() {
642
642
}
643
643
} ;
644
644
645
- /**
646
- * We couldn't get a token.
647
- *
648
- * @api private
649
- */
650
- Client . prototype . onTokenError = function ( err ) {
651
- debug ( 'Could not get token. Server not running?' , err ) ;
652
- this . emit ( 'error' , err ) ;
653
- } ;
654
-
655
645
Client . prototype . _initSocketio = function ( ) {
656
646
if ( this . io ) return ;
657
647
658
- this . io = socketio ( this . config . scout , {
659
- query : 'token=' + this . token . toString ( )
660
- } ) ;
648
+ this . io = socketio ( this . config . scout ) ;
661
649
this . io . on ( 'reconnecting' , this . emit . bind ( this , 'reconnecting' ) )
662
650
. on ( 'reconnect' , this . onReconnect . bind ( this ) )
663
651
. on ( 'reconnect_attempt' , this . emit . bind ( this , 'reconnect_attempt' ) )
664
652
. on ( 'reconnect_failed' , this . emit . bind ( this , 'reconnect_failed' ) )
665
653
. on ( 'disconnect' , this . emit . bind ( this , 'disconnect' ) ) ;
666
654
this . io . on ( 'connect' , function ( ) {
667
655
debug ( 'connected to scout-server socket' ) ;
668
- } ) ;
656
+ this . io . on ( 'authenticated' , function ( ) {
657
+ debug ( 'Success! Now authenticated with scout-server socket.io transport' ) ;
658
+
659
+ this . readable = true ;
660
+ this . context . set ( this . token . session ) ;
661
+ debug ( 'now ready for use' ) ;
662
+ this . emit ( 'readable' , this . token . session ) ;
663
+ } . bind ( this ) ) ;
664
+ } . bind ( this ) ) ;
665
+ this . io . on ( 'error' , this . onError . bind ( this ) ) ;
669
666
} ;
670
667
671
668
/**
@@ -689,20 +686,19 @@ Client.prototype.onUnload = function() {
689
686
* @param {Error } err
690
687
* @api private
691
688
*/
692
- Client . prototype . onTokenError = function ( err ) {
693
- this . dead = err ;
694
- if ( err >= 500 ) {
695
- this . dead . message += ' (scout-server dead at ' + this . config . scout + '?)' ;
689
+ Client . prototype . onError = function ( err ) {
690
+ if ( err . message === 'Origin is not allowed by Access-Control-Allow-Origin' ) {
691
+ err . message = 'scout-server not running at ' + this . config . scout + '?' ;
696
692
}
693
+ // @todo : Exponential back-off to retry connecting.
694
+ this . dead = err ;
697
695
this . emit ( 'error' , err ) ;
698
696
} ;
699
697
700
698
Client . prototype . onReconnect = function ( ) {
701
699
debug ( 'reconnected. getting new token' ) ;
702
700
this . token = new Token ( this . config )
703
701
. on ( 'readable' , function ( ) {
704
- this . readable = true ;
705
- this . context . set ( this . token . session ) ;
706
702
this . _initSocketio ( ) ;
707
703
} . bind ( this ) )
708
704
. on ( 'error' , this . emit . bind ( this , 'error' ) ) ;
@@ -736,7 +732,7 @@ Client.prototype.createReadStream = function(_id, data) {
736
732
return debug ( 'proxy already transferred' ) ;
737
733
}
738
734
} ) ;
739
- this . on ( 'readable' , function ( ) {
735
+ this . once ( 'readable' , function ( ) {
740
736
debug ( 'client readable' ) ;
741
737
var src = client . createReadStream ( _id , data ) ;
742
738
src . on ( 'data' , proxy . emit . bind ( proxy , 'data' ) ) ;
@@ -752,6 +748,10 @@ Client.prototype.createReadStream = function(_id, data) {
752
748
data = data || { } ;
753
749
var stream = ss . createStream ( this . io ) ;
754
750
ss ( this . io ) . emit ( _id , stream , data ) ;
755
- return stream . pipe ( EJSON . createParseStream ( ) ) ;
751
+
752
+ var parser = EJSON . createParseStream ( ) ;
753
+ var res = stream . pipe ( parser ) ;
754
+ stream . on ( 'error' , res . emit . bind ( res , 'error' ) ) ;
755
+ return res ;
756
756
}
757
757
} ;
0 commit comments