Skip to content

Commit 82a9e11

Browse files
authored
ethstats: avoid concurrent write on websocket (#21404)
Fixes #21403
1 parent b35e4fc commit 82a9e11

File tree

1 file changed

+57
-11
lines changed

1 file changed

+57
-11
lines changed

ethstats/ethstats.go

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"runtime"
2929
"strconv"
3030
"strings"
31+
"sync"
3132
"time"
3233

3334
"github.com/ethereum/go-ethereum/common"
@@ -93,6 +94,49 @@ type Service struct {
9394

9495
pongCh chan struct{} // Pong notifications are fed into this channel
9596
histCh chan []uint64 // History request block numbers are fed into this channel
97+
98+
}
99+
100+
// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
101+
// websocket.
102+
// From Gorilla websocket docs:
103+
// Connections support one concurrent reader and one concurrent writer.
104+
// Applications are responsible for ensuring that no more than one goroutine calls the write methods
105+
// - NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel
106+
// concurrently and that no more than one goroutine calls the read methods
107+
// - NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler
108+
// concurrently.
109+
// The Close and WriteControl methods can be called concurrently with all other methods.
110+
//
111+
// The connWrapper uses a single mutex for both reading and writing.
112+
type connWrapper struct {
113+
conn *websocket.Conn
114+
mu sync.Mutex
115+
}
116+
117+
func newConnectionWrapper(conn *websocket.Conn) *connWrapper {
118+
return &connWrapper{conn: conn}
119+
}
120+
121+
// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling
122+
func (w *connWrapper) WriteJSON(v interface{}) error {
123+
w.mu.Lock()
124+
defer w.mu.Unlock()
125+
return w.conn.WriteJSON(v)
126+
}
127+
128+
// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling
129+
func (w *connWrapper) ReadJSON(v interface{}) error {
130+
w.mu.Lock()
131+
defer w.mu.Unlock()
132+
return w.conn.ReadJSON(v)
133+
}
134+
135+
// Close wraps corresponding method on the websocket but is safe for concurrent calling
136+
func (w *connWrapper) Close() error {
137+
// The Close and WriteControl methods can be called concurrently with all other methods,
138+
// so the mutex is not used here
139+
return w.conn.Close()
96140
}
97141

98142
// New returns a monitoring service ready for stats reporting.
@@ -204,17 +248,19 @@ func (s *Service) loop() {
204248
case <-errTimer.C:
205249
// Establish a websocket connection to the server on any supported URL
206250
var (
207-
conn *websocket.Conn
251+
conn *connWrapper
208252
err error
209253
)
210254
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
211255
header := make(http.Header)
212256
header.Set("origin", "http://localhost")
213257
for _, url := range urls {
214-
conn, _, err = dialer.Dial(url, header)
215-
if err == nil {
258+
c, _, e := dialer.Dial(url, header)
259+
if e == nil {
260+
conn = newConnectionWrapper(c)
216261
break
217262
}
263+
err = e
218264
}
219265
if err != nil {
220266
log.Warn("Stats server unreachable", "err", err)
@@ -282,7 +328,7 @@ func (s *Service) loop() {
282328
// from the network socket. If any of them match an active request, it forwards
283329
// it, if they themselves are requests it initiates a reply, and lastly it drops
284330
// unknown packets.
285-
func (s *Service) readLoop(conn *websocket.Conn) {
331+
func (s *Service) readLoop(conn *connWrapper) {
286332
// If the read loop exists, close the connection
287333
defer conn.Close()
288334

@@ -391,7 +437,7 @@ type authMsg struct {
391437
}
392438

393439
// login tries to authorize the client at the remote server.
394-
func (s *Service) login(conn *websocket.Conn) error {
440+
func (s *Service) login(conn *connWrapper) error {
395441
// Construct and send the login authentication
396442
infos := s.server.NodeInfo()
397443

@@ -436,7 +482,7 @@ func (s *Service) login(conn *websocket.Conn) error {
436482
// report collects all possible data to report and send it to the stats server.
437483
// This should only be used on reconnects or rarely to avoid overloading the
438484
// server. Use the individual methods for reporting subscribed events.
439-
func (s *Service) report(conn *websocket.Conn) error {
485+
func (s *Service) report(conn *connWrapper) error {
440486
if err := s.reportLatency(conn); err != nil {
441487
return err
442488
}
@@ -454,7 +500,7 @@ func (s *Service) report(conn *websocket.Conn) error {
454500

455501
// reportLatency sends a ping request to the server, measures the RTT time and
456502
// finally sends a latency update.
457-
func (s *Service) reportLatency(conn *websocket.Conn) error {
503+
func (s *Service) reportLatency(conn *connWrapper) error {
458504
// Send the current time to the ethstats server
459505
start := time.Now()
460506

@@ -523,7 +569,7 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
523569
}
524570

525571
// reportBlock retrieves the current chain head and reports it to the stats server.
526-
func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error {
572+
func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error {
527573
// Gather the block details from the header or block chain
528574
details := s.assembleBlockStats(block)
529575

@@ -598,7 +644,7 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
598644

599645
// reportHistory retrieves the most recent batch of blocks and reports it to the
600646
// stats server.
601-
func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
647+
func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
602648
// Figure out the indexes that need reporting
603649
indexes := make([]uint64, 0, historyUpdateRange)
604650
if len(list) > 0 {
@@ -660,7 +706,7 @@ type pendStats struct {
660706

661707
// reportPending retrieves the current number of pending transactions and reports
662708
// it to the stats server.
663-
func (s *Service) reportPending(conn *websocket.Conn) error {
709+
func (s *Service) reportPending(conn *connWrapper) error {
664710
// Retrieve the pending count from the local blockchain
665711
pending, _ := s.backend.Stats()
666712
// Assemble the transaction stats and send it to the server
@@ -691,7 +737,7 @@ type nodeStats struct {
691737

692738
// reportStats retrieves various stats about the node at the networking and
693739
// mining layer and reports it to the stats server.
694-
func (s *Service) reportStats(conn *websocket.Conn) error {
740+
func (s *Service) reportStats(conn *connWrapper) error {
695741
// Gather the syncing and mining infos from the local miner instance
696742
var (
697743
mining bool

0 commit comments

Comments
 (0)