Skip to content

Commit d98c42c

Browse files
authored
rpc: send websocket ping when connection is idle (#21142)
* rpc: send websocket ping when connection is idle * rpc: use non-blocking send for websocket pingReset
1 parent 723bd8c commit d98c42c

File tree

1 file changed

+63
-3
lines changed

1 file changed

+63
-3
lines changed

rpc/websocket.go

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,18 @@ import (
2525
"os"
2626
"strings"
2727
"sync"
28+
"time"
2829

2930
mapset "github.com/deckarep/golang-set"
3031
"github.com/ethereum/go-ethereum/log"
3132
"github.com/gorilla/websocket"
3233
)
3334

3435
const (
35-
wsReadBuffer = 1024
36-
wsWriteBuffer = 1024
36+
wsReadBuffer = 1024
37+
wsWriteBuffer = 1024
38+
wsPingInterval = 60 * time.Second
39+
wsPingWriteTimeout = 5 * time.Second
3740
)
3841

3942
var wsBufferPool = new(sync.Pool)
@@ -168,7 +171,64 @@ func wsClientHeaders(endpoint, origin string) (string, http.Header, error) {
168171
return endpointURL.String(), header, nil
169172
}
170173

174+
type websocketCodec struct {
175+
*jsonCodec
176+
conn *websocket.Conn
177+
178+
wg sync.WaitGroup
179+
pingReset chan struct{}
180+
}
181+
171182
func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
172183
conn.SetReadLimit(maxRequestContentLength)
173-
return NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON)
184+
wc := &websocketCodec{
185+
jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec),
186+
conn: conn,
187+
pingReset: make(chan struct{}, 1),
188+
}
189+
wc.wg.Add(1)
190+
go wc.pingLoop()
191+
return wc
192+
}
193+
194+
func (wc *websocketCodec) close() {
195+
wc.jsonCodec.close()
196+
wc.wg.Wait()
197+
}
198+
199+
func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error {
200+
err := wc.jsonCodec.writeJSON(ctx, v)
201+
if err == nil {
202+
// Notify pingLoop to delay the next idle ping.
203+
select {
204+
case wc.pingReset <- struct{}{}:
205+
default:
206+
}
207+
}
208+
return err
209+
}
210+
211+
// pingLoop sends periodic ping frames when the connection is idle.
212+
func (wc *websocketCodec) pingLoop() {
213+
var timer = time.NewTimer(wsPingInterval)
214+
defer wc.wg.Done()
215+
defer timer.Stop()
216+
217+
for {
218+
select {
219+
case <-wc.closed():
220+
return
221+
case <-wc.pingReset:
222+
if !timer.Stop() {
223+
<-timer.C
224+
}
225+
timer.Reset(wsPingInterval)
226+
case <-timer.C:
227+
wc.jsonCodec.encMu.Lock()
228+
wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout))
229+
wc.conn.WriteMessage(websocket.PingMessage, nil)
230+
wc.jsonCodec.encMu.Unlock()
231+
timer.Reset(wsPingInterval)
232+
}
233+
}
174234
}

0 commit comments

Comments
 (0)