@@ -51,6 +51,7 @@ type wsConn struct {
5151 handler * RPCServer
5252 requests <- chan clientRequest
5353 pongs chan struct {}
54+ stopPings func ()
5455 stop <- chan struct {}
5556 exiting chan struct {}
5657
@@ -511,6 +512,50 @@ func (c *wsConn) setupPings() func() {
511512 }
512513}
513514
515+ // returns true if reconnected
516+ func (c * wsConn ) tryReconnect (ctx context.Context ) bool {
517+ if c .connFactory == nil { // server side
518+ return false
519+ }
520+
521+ // connection dropped unexpectedly, do our best to recover it
522+ c .closeInFlight ()
523+ c .closeChans ()
524+ c .incoming = make (chan io.Reader ) // listen again for responses
525+ go func () {
526+ c .stopPings ()
527+
528+ attempts := 0
529+ var conn * websocket.Conn
530+ for conn == nil {
531+ time .Sleep (c .reconnectBackoff .next (attempts ))
532+ var err error
533+ if conn , err = c .connFactory (); err != nil {
534+ log .Debugw ("websocket connection retry failed" , "error" , err )
535+ }
536+ select {
537+ case <- ctx .Done ():
538+ break
539+ default :
540+ continue
541+ }
542+ attempts ++
543+ }
544+
545+ c .writeLk .Lock ()
546+ c .conn = conn
547+ c .incomingErr = nil
548+
549+ c .stopPings = c .setupPings ()
550+
551+ c .writeLk .Unlock ()
552+
553+ go c .nextMessage ()
554+ }()
555+
556+ return true
557+ }
558+
514559func (c * wsConn ) handleWsConn (ctx context.Context ) {
515560 c .incoming = make (chan io.Reader )
516561 c .inflight = map [int64 ]clientRequest {}
@@ -530,8 +575,8 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
530575
531576 // setup pings
532577
533- stopPings : = c .setupPings ()
534- defer stopPings ()
578+ c . stopPings = c .setupPings ()
579+ defer c . stopPings ()
535580
536581 var timeoutTimer * time.Timer
537582 if c .timeout != 0 {
@@ -557,62 +602,30 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
557602
558603 select {
559604 case r , ok := <- c .incoming :
560- if ! ok {
561- if c .incomingErr != nil {
562- log .Debugw ("websocket error" , "error" , c .incomingErr )
563- // only client needs to reconnect
564- if c .connFactory != nil {
565- // connection dropped unexpectedly, do our best to recover it
566- c .closeInFlight ()
567- c .closeChans ()
568- c .incoming = make (chan io.Reader ) // listen again for responses
569- go func () {
570- stopPings ()
571-
572- attempts := 0
573- var conn * websocket.Conn
574- for conn == nil {
575- time .Sleep (c .reconnectBackoff .next (attempts ))
576- var err error
577- if conn , err = c .connFactory (); err != nil {
578- log .Debugw ("websocket connection retry failed" , "error" , err )
579- }
580- select {
581- case <- ctx .Done ():
582- break
583- default :
584- continue
585- }
586- attempts ++
587- }
588-
589- c .writeLk .Lock ()
590- c .conn = conn
591- c .incomingErr = nil
592-
593- stopPings = c .setupPings ()
594-
595- c .writeLk .Unlock ()
596-
597- go c .nextMessage ()
598- }()
599- continue
600- }
605+ err := c .incomingErr
606+
607+ if ok {
608+ // debug util - dump all messages to stderr
609+ // r = io.TeeReader(r, os.Stderr)
610+
611+ var frame frame
612+ err = json .NewDecoder (r ).Decode (& frame )
613+ if err == nil {
614+ c .handleFrame (ctx , frame )
615+ go c .nextMessage ()
616+ continue
601617 }
602- return // remote closed
603618 }
604619
605- // debug util - dump all messages to stderr
606- // r = io.TeeReader(r, os.Stderr)
607-
608- var frame frame
609- if err := json .NewDecoder (r ).Decode (& frame ); err != nil {
610- log .Error ("handle me:" , err )
611- return
620+ if err == nil {
621+ return // remote closed
612622 }
613623
614- c .handleFrame (ctx , frame )
615- go c .nextMessage ()
624+ log .Errorw ("websocket error" , "error" , err )
625+ // only client needs to reconnect
626+ if ! c .tryReconnect (ctx ) {
627+ return // failed to reconnect
628+ }
616629 case req := <- c .requests :
617630 c .writeLk .Lock ()
618631 if req .req .ID != nil {
0 commit comments