@@ -31,6 +31,7 @@ import (
31
31
"time"
32
32
33
33
"github.com/ethereum/go-ethereum/common"
34
+ "github.com/ethereum/go-ethereum/common/mclock"
34
35
"github.com/ethereum/go-ethereum/consensus"
35
36
"github.com/ethereum/go-ethereum/core"
36
37
"github.com/ethereum/go-ethereum/core/types"
@@ -119,7 +120,7 @@ func (s *Service) Stop() error {
119
120
// loop keeps trying to connect to the netstats server, reporting chain events
120
121
// until termination.
121
122
func (s * Service ) loop () {
122
- // Subscribe tso chain events to execute updates on
123
+ // Subscribe to chain events to execute updates on
123
124
var emux * event.TypeMux
124
125
if s .eth != nil {
125
126
emux = s .eth .EventMux ()
@@ -132,6 +133,46 @@ func (s *Service) loop() {
132
133
txSub := emux .Subscribe (core.TxPreEvent {})
133
134
defer txSub .Unsubscribe ()
134
135
136
+ // Start a goroutine that exhausts the subsciptions to avoid events piling up
137
+ var (
138
+ quitCh = make (chan struct {})
139
+ headCh = make (chan * types.Block , 1 )
140
+ txCh = make (chan struct {}, 1 )
141
+ )
142
+ go func () {
143
+ var lastTx mclock.AbsTime
144
+
145
+ for {
146
+ select {
147
+ // Notify of chain head events, but drop if too frequent
148
+ case head , ok := <- headSub .Chan ():
149
+ if ! ok { // node stopped
150
+ close (quitCh )
151
+ return
152
+ }
153
+ select {
154
+ case headCh <- head .Data .(core.ChainHeadEvent ).Block :
155
+ default :
156
+ }
157
+
158
+ // Notify of new transaction events, but drop if too frequent
159
+ case _ , ok := <- txSub .Chan ():
160
+ if ! ok { // node stopped
161
+ close (quitCh )
162
+ return
163
+ }
164
+ if time .Duration (mclock .Now ()- lastTx ) < time .Second {
165
+ continue
166
+ }
167
+ lastTx = mclock .Now ()
168
+
169
+ select {
170
+ case txCh <- struct {}{}:
171
+ default :
172
+ }
173
+ }
174
+ }
175
+ }()
135
176
// Loop reporting until termination
136
177
for {
137
178
// Resolve the URL, defaulting to TLS, but falling back to none too
@@ -151,7 +192,7 @@ func (s *Service) loop() {
151
192
if conf , err = websocket .NewConfig (url , "http://localhost/" ); err != nil {
152
193
continue
153
194
}
154
- conf .Dialer = & net.Dialer {Timeout : 3 * time .Second }
195
+ conf .Dialer = & net.Dialer {Timeout : 5 * time .Second }
155
196
if conn , err = websocket .DialConfig (conf ); err == nil {
156
197
break
157
198
}
@@ -181,6 +222,10 @@ func (s *Service) loop() {
181
222
182
223
for err == nil {
183
224
select {
225
+ case <- quitCh :
226
+ conn .Close ()
227
+ return
228
+
184
229
case <- fullReport .C :
185
230
if err = s .report (conn ); err != nil {
186
231
log .Warn ("Full stats report failed" , "err" , err )
@@ -189,30 +234,14 @@ func (s *Service) loop() {
189
234
if err = s .reportHistory (conn , list ); err != nil {
190
235
log .Warn ("Requested history report failed" , "err" , err )
191
236
}
192
- case head , ok := <- headSub .Chan ():
193
- if ! ok { // node stopped
194
- conn .Close ()
195
- return
196
- }
197
- if err = s .reportBlock (conn , head .Data .(core.ChainHeadEvent ).Block ); err != nil {
237
+ case head := <- headCh :
238
+ if err = s .reportBlock (conn , head ); err != nil {
198
239
log .Warn ("Block stats report failed" , "err" , err )
199
240
}
200
241
if err = s .reportPending (conn ); err != nil {
201
242
log .Warn ("Post-block transaction stats report failed" , "err" , err )
202
243
}
203
- case _ , ok := <- txSub .Chan ():
204
- if ! ok { // node stopped
205
- conn .Close ()
206
- return
207
- }
208
- // Exhaust events to avoid reporting too frequently
209
- for exhausted := false ; ! exhausted ; {
210
- select {
211
- case <- headSub .Chan ():
212
- default :
213
- exhausted = true
214
- }
215
- }
244
+ case <- txCh :
216
245
if err = s .reportPending (conn ); err != nil {
217
246
log .Warn ("Transaction stats report failed" , "err" , err )
218
247
}
@@ -398,7 +427,7 @@ func (s *Service) reportLatency(conn *websocket.Conn) error {
398
427
select {
399
428
case <- s .pongCh :
400
429
// Pong delivered, report the latency
401
- case <- time .After (3 * time .Second ):
430
+ case <- time .After (5 * time .Second ):
402
431
// Ping timeout, abort
403
432
return errors .New ("ping timed out" )
404
433
}
0 commit comments