Skip to content

Commit b6fa446

Browse files
authored
Merge pull request #92 from filecoin-project/feat/ws-harden
Harden websocket loop
2 parents 28c59fd + 96ed6ea commit b6fa446

File tree

4 files changed

+77
-24
lines changed

4 files changed

+77
-24
lines changed

options_server.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package jsonrpc
33
import (
44
"context"
55
"reflect"
6+
"time"
67
)
78

89
type ParamDecoder func(ctx context.Context, json []byte) (reflect.Value, error)
910

1011
type ServerConfig struct {
11-
paramDecoders map[reflect.Type]ParamDecoder
1212
maxRequestSize int64
13-
errors *Errors
13+
pingInterval time.Duration
14+
15+
paramDecoders map[reflect.Type]ParamDecoder
16+
errors *Errors
1417
}
1518

1619
type ServerOption func(c *ServerConfig)
@@ -19,6 +22,8 @@ func defaultServerConfig() ServerConfig {
1922
return ServerConfig{
2023
paramDecoders: map[reflect.Type]ParamDecoder{},
2124
maxRequestSize: DEFAULT_MAX_REQUEST_SIZE,
25+
26+
pingInterval: 5 * time.Second,
2227
}
2328
}
2429

@@ -39,3 +44,9 @@ func WithServerErrors(es Errors) ServerOption {
3944
c.errors = &es
4045
}
4146
}
47+
48+
func WithServerPingInterval(d time.Duration) ServerOption {
49+
return func(c *ServerConfig) {
50+
c.pingInterval = d
51+
}
52+
}

rpc_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ func init() {
2727
if err := logging.SetLogLevel("rpc", "DEBUG"); err != nil {
2828
panic(err)
2929
}
30+
31+
debugTrace = true
3032
}
3133

3234
type SimpleServerHandler struct {

server.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88
"reflect"
99
"strings"
10+
"time"
1011

1112
"github.com/gorilla/websocket"
1213
)
@@ -28,6 +29,7 @@ type RPCServer struct {
2829

2930
paramDecoders map[reflect.Type]ParamDecoder
3031

32+
pingInterval time.Duration
3133
maxRequestSize int64
3234
}
3335

@@ -44,6 +46,8 @@ func NewServer(opts ...ServerOption) *RPCServer {
4446
paramDecoders: config.paramDecoders,
4547
maxRequestSize: config.maxRequestSize,
4648
errors: config.errors,
49+
50+
pingInterval: config.pingInterval,
4751
}
4852
}
4953

@@ -63,19 +67,20 @@ func (s *RPCServer) handleWS(ctx context.Context, w http.ResponseWriter, r *http
6367

6468
c, err := upgrader.Upgrade(w, r, nil)
6569
if err != nil {
66-
log.Error(err)
67-
w.WriteHeader(500)
70+
log.Errorw("upgrading connection", "error", err)
71+
// note that upgrader.Upgrade will set http error if there is an error
6872
return
6973
}
7074

7175
(&wsConn{
72-
conn: c,
73-
handler: s,
74-
exiting: make(chan struct{}),
76+
conn: c,
77+
handler: s,
78+
pingInterval: s.pingInterval,
79+
exiting: make(chan struct{}),
7580
}).handleWsConn(ctx)
7681

7782
if err := c.Close(); err != nil {
78-
log.Error(err)
83+
log.Errorw("closing websocket connection", "error", err)
7984
return
8085
}
8186
}

websocket.go

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"fmt"
78
"io"
89
"io/ioutil"
10+
"os"
911
"reflect"
1012
"sync"
1113
"sync/atomic"
@@ -19,6 +21,8 @@ const wsCancel = "xrpc.cancel"
1921
const chValue = "xrpc.ch.val"
2022
const chClose = "xrpc.ch.close"
2123

24+
var debugTrace = os.Getenv("JSONRPC_ENABLE_DEBUG_TRACE") == "1"
25+
2226
type frame struct {
2327
// common
2428
Jsonrpc string `json:"jsonrpc"`
@@ -92,11 +96,7 @@ type wsConn struct {
9296

9397
// nextMessage wait for one message and puts it to the incoming channel
9498
func (c *wsConn) nextMessage() {
95-
if c.timeout > 0 {
96-
if err := c.conn.SetReadDeadline(time.Now().Add(c.timeout)); err != nil {
97-
log.Error("setting read deadline", err)
98-
}
99-
}
99+
c.resetReadDeadline()
100100
msgType, r, err := c.conn.NextReader()
101101
if err != nil {
102102
c.incomingErr = err
@@ -135,6 +135,10 @@ func (c *wsConn) sendRequest(req request) error {
135135
c.writeLk.Lock()
136136
defer c.writeLk.Unlock()
137137

138+
if debugTrace {
139+
log.Debugw("sendRequest", "req", req.Method, "id", req.ID)
140+
}
141+
138142
if err := c.conn.WriteJSON(req); err != nil {
139143
return err
140144
}
@@ -275,10 +279,11 @@ func (c *wsConn) handleChanOut(ch reflect.Value, req interface{}) error {
275279

276280
// handleCtxAsync handles context lifetimes for client
277281
// TODO: this should be aware of events going through chanHandlers, and quit
278-
// when the related channel is closed.
279-
// This should also probably be a single goroutine,
280-
// Note that not doing this should be fine for now as long as we are using
281-
// contexts correctly (cancelling when async functions are no longer is use)
282+
//
283+
// when the related channel is closed.
284+
// This should also probably be a single goroutine,
285+
// Note that not doing this should be fine for now as long as we are using
286+
// contexts correctly (cancelling when async functions are no longer is use)
282287
func (c *wsConn) handleCtxAsync(actx context.Context, id interface{}) {
283288
<-actx.Done()
284289

@@ -486,6 +491,14 @@ func (c *wsConn) setupPings() func() {
486491
}
487492
return nil
488493
})
494+
c.conn.SetPingHandler(func(appData string) error {
495+
// treat pings as pongs - this lets us register server activity even if it's too busy to respond to our pings
496+
select {
497+
case c.pongs <- struct{}{}:
498+
default:
499+
}
500+
return nil
501+
})
489502

490503
stop := make(chan struct{})
491504

@@ -600,8 +613,13 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
600613
timeoutCh = timeoutTimer.C
601614
}
602615

616+
start := time.Now()
617+
action := ""
618+
603619
select {
604620
case r, ok := <-c.incoming:
621+
action = "incoming"
622+
605623
err := c.incomingErr
606624

607625
if ok {
@@ -611,9 +629,11 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
611629
var frame frame
612630
if err = json.NewDecoder(r).Decode(&frame); err == nil {
613631
if frame.ID, err = normalizeID(frame.ID); err == nil {
632+
action = fmt.Sprintf("incoming(%s,%v)", frame.Method, frame.ID)
633+
614634
c.handleFrame(ctx, frame)
615635
go c.nextMessage()
616-
continue
636+
break
617637
}
618638
}
619639
}
@@ -628,6 +648,8 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
628648
return // failed to reconnect
629649
}
630650
case req := <-c.requests:
651+
action = fmt.Sprintf("send-request(%s,%v)", req.req.Method, req.req.ID)
652+
631653
c.writeLk.Lock()
632654
if req.req.ID != nil {
633655
if c.incomingErr != nil { // No conn?, immediate fail
@@ -649,11 +671,9 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
649671
log.Errorf("sendReqest failed (Handle me): %s", err)
650672
}
651673
case <-c.pongs:
652-
if c.timeout > 0 {
653-
if err := c.conn.SetReadDeadline(time.Now().Add(c.timeout)); err != nil {
654-
log.Error("setting read deadline", err)
655-
}
656-
}
674+
action = "pong"
675+
676+
c.resetReadDeadline()
657677
case <-timeoutCh:
658678
if c.pingInterval == 0 {
659679
// pings not running, this is perfectly normal
@@ -665,7 +685,7 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
665685
log.Warnw("timed-out websocket close error", "error", err)
666686
}
667687
c.writeLk.Unlock()
668-
log.Errorw("Connection timeout", "remote", c.conn.RemoteAddr())
688+
log.Errorw("Connection timeout", "remote", c.conn.RemoteAddr(), "lastAction", action)
669689
// The server side does not perform the reconnect operation, so need to exit
670690
if c.connFactory == nil {
671691
return
@@ -684,6 +704,21 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
684704
c.writeLk.Unlock()
685705
return
686706
}
707+
708+
if c.pingInterval > 0 && time.Since(start) > c.pingInterval*2 {
709+
log.Warnw("websocket long time no response", "lastAction", action, "time", time.Since(start))
710+
}
711+
if debugTrace {
712+
log.Debugw("websocket action", "lastAction", action, "time", time.Since(start))
713+
}
714+
}
715+
}
716+
717+
func (c *wsConn) resetReadDeadline() {
718+
if c.timeout > 0 {
719+
if err := c.conn.SetReadDeadline(time.Now().Add(c.timeout)); err != nil {
720+
log.Error("setting read deadline", err)
721+
}
687722
}
688723
}
689724

0 commit comments

Comments
 (0)