Skip to content

Commit ee445a2

Browse files
authored
Merge pull request #3425 from karalabe/netstats-time-fixup
netstats: time and block history
2 parents 4f9ccdd + b2c226c commit ee445a2

File tree

1 file changed

+197
-38
lines changed

1 file changed

+197
-38
lines changed

ethstats/ethstats.go

Lines changed: 197 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"regexp"
2626
"runtime"
2727
"strconv"
28+
"strings"
2829
"time"
2930

3031
"github.com/ethereum/go-ethereum/common"
@@ -41,6 +42,10 @@ import (
4142
"golang.org/x/net/websocket"
4243
)
4344

45+
// historyUpdateRange is the number of blocks a node should report upon login or
46+
// history request.
47+
const historyUpdateRange = 50
48+
4449
// Service implements an Ethereum netstats reporting daemon that pushes local
4550
// chain statistics up to a monitoring server.
4651
type Service struct {
@@ -53,6 +58,9 @@ type Service struct {
5358
node string // Name of the node to display on the monitoring page
5459
pass string // Password to authorize access to the monitoring page
5560
host string // Remote address of the monitoring service
61+
62+
pongCh chan struct{} // Pong notifications are fed into this channel
63+
histCh chan []uint64 // History request block numbers are fed into this channel
5664
}
5765

5866
// New returns a monitoring service ready for stats reporting.
@@ -65,11 +73,13 @@ func New(url string, ethServ *eth.Ethereum, lesServ *les.LightEthereum) (*Servic
6573
}
6674
// Assemble and return the stats service
6775
return &Service{
68-
eth: ethServ,
69-
les: lesServ,
70-
node: parts[1],
71-
pass: parts[3],
72-
host: parts[4],
76+
eth: ethServ,
77+
les: lesServ,
78+
node: parts[1],
79+
pass: parts[3],
80+
host: parts[4],
81+
pongCh: make(chan struct{}),
82+
histCh: make(chan []uint64, 1),
7383
}, nil
7484
}
7585

@@ -115,7 +125,11 @@ func (s *Service) loop() {
115125
// Loop reporting until termination
116126
for {
117127
// Establish a websocket connection to the server and authenticate the node
118-
conn, err := websocket.Dial(fmt.Sprintf("wss://%s/api", s.host), "", "http://localhost/")
128+
url := fmt.Sprintf("%s/api", s.host)
129+
if !strings.Contains(url, "://") {
130+
url = "wss://" + url
131+
}
132+
conn, err := websocket.Dial(url, "", "http://localhost/")
119133
if err != nil {
120134
glog.V(logger.Warn).Infof("Stats server unreachable: %v", err)
121135
time.Sleep(10 * time.Second)
@@ -130,22 +144,34 @@ func (s *Service) loop() {
130144
time.Sleep(10 * time.Second)
131145
continue
132146
}
133-
if err = s.report(in, out); err != nil {
147+
go s.readLoop(conn, in)
148+
149+
// Send the initial stats so our node looks decent from the get go
150+
if err = s.report(out); err != nil {
134151
glog.V(logger.Warn).Infof("Initial stats report failed: %v", err)
135152
conn.Close()
136153
continue
137154
}
155+
if err = s.reportHistory(out, nil); err != nil {
156+
glog.V(logger.Warn).Infof("History report failed: %v", err)
157+
conn.Close()
158+
continue
159+
}
138160
// Keep sending status updates until the connection breaks
139161
fullReport := time.NewTicker(15 * time.Second)
140162

141163
for err == nil {
142164
select {
143165
case <-fullReport.C:
144-
if err = s.report(in, out); err != nil {
166+
if err = s.report(out); err != nil {
145167
glog.V(logger.Warn).Infof("Full stats report failed: %v", err)
146168
}
147-
case head := <-headSub.Chan():
148-
if head == nil { // node stopped
169+
case list := <-s.histCh:
170+
if err = s.reportHistory(out, list); err != nil {
171+
glog.V(logger.Warn).Infof("Block history report failed: %v", err)
172+
}
173+
case head, ok := <-headSub.Chan():
174+
if !ok { // node stopped
149175
conn.Close()
150176
return
151177
}
@@ -155,8 +181,8 @@ func (s *Service) loop() {
155181
if err = s.reportPending(out); err != nil {
156182
glog.V(logger.Warn).Infof("Post-block transaction stats report failed: %v", err)
157183
}
158-
case ev := <-txSub.Chan():
159-
if ev == nil { // node stopped
184+
case _, ok := <-txSub.Chan():
185+
if !ok { // node stopped
160186
conn.Close()
161187
return
162188
}
@@ -178,6 +204,76 @@ func (s *Service) loop() {
178204
}
179205
}
180206

207+
// readLoop loops as long as the connection is alive and retrieves data packets
208+
// from the network socket. If any of them match an active request, it forwards
209+
// it, if they themselves are requests it initiates a reply, and lastly it drops
210+
// unknown packets.
211+
func (s *Service) readLoop(conn *websocket.Conn, in *json.Decoder) {
212+
// If the read loop exists, close the connection
213+
defer conn.Close()
214+
215+
for {
216+
// Retrieve the next generic network packet and bail out on error
217+
var msg map[string][]interface{}
218+
if err := in.Decode(&msg); err != nil {
219+
glog.V(logger.Warn).Infof("Failed to decode stats server message: %v", err)
220+
return
221+
}
222+
if len(msg["emit"]) == 0 {
223+
glog.V(logger.Warn).Infof("Stats server sent non-broadcast: %v", msg)
224+
return
225+
}
226+
command, ok := msg["emit"][0].(string)
227+
if !ok {
228+
glog.V(logger.Warn).Infof("Invalid stats server message type: %v", msg["emit"][0])
229+
return
230+
}
231+
// If the message is a ping reply, deliver (someone must be listening!)
232+
if len(msg["emit"]) == 2 && command == "node-pong" {
233+
select {
234+
case s.pongCh <- struct{}{}:
235+
// Pong delivered, continue listening
236+
continue
237+
default:
238+
// Ping routine dead, abort
239+
glog.V(logger.Warn).Infof("Stats server pinger seems to have died")
240+
return
241+
}
242+
}
243+
// If the message is a history request, forward to the event processor
244+
if len(msg["emit"]) == 2 && command == "history" {
245+
// Make sure the request is valid and doesn't crash us
246+
request, ok := msg["emit"][1].(map[string]interface{})
247+
if !ok {
248+
glog.V(logger.Warn).Infof("Invalid history request: %v", msg["emit"][1])
249+
return
250+
}
251+
list, ok := request["list"].([]interface{})
252+
if !ok {
253+
glog.V(logger.Warn).Infof("Invalid history block list: %v", request["list"])
254+
return
255+
}
256+
// Convert the block number list to an integer list
257+
numbers := make([]uint64, len(list))
258+
for i, num := range list {
259+
n, ok := num.(float64)
260+
if !ok {
261+
glog.V(logger.Warn).Infof("Invalid history block number: %v", num)
262+
return
263+
}
264+
numbers[i] = uint64(n)
265+
}
266+
select {
267+
case s.histCh <- numbers:
268+
continue
269+
default:
270+
}
271+
}
272+
// Report anything else and continue
273+
glog.V(logger.Info).Infof("Unknown stats message: %v", msg)
274+
}
275+
}
276+
181277
// nodeInfo is the collection of metainformation about a node that is displayed
182278
// on the monitoring page.
183279
type nodeInfo struct {
@@ -190,6 +286,7 @@ type nodeInfo struct {
190286
Os string `json:"os"`
191287
OsVer string `json:"os_v"`
192288
Client string `json:"client"`
289+
History bool `json:"canUpdateHistory"`
193290
}
194291

195292
// authMsg is the authentication infos needed to login to a monitoring server.
@@ -224,6 +321,7 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
224321
Os: runtime.GOOS,
225322
OsVer: runtime.GOARCH,
226323
Client: "0.1.1",
324+
History: true,
227325
},
228326
Secret: s.pass,
229327
}
@@ -244,8 +342,8 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
244342
// report collects all possible data to report and send it to the stats server.
245343
// This should only be used on reconnects or rarely to avoid overloading the
246344
// server. Use the individual methods for reporting subscribed events.
247-
func (s *Service) report(in *json.Decoder, out *json.Encoder) error {
248-
if err := s.reportLatency(in, out); err != nil {
345+
func (s *Service) report(out *json.Encoder) error {
346+
if err := s.reportLatency(out); err != nil {
249347
return err
250348
}
251349
if err := s.reportBlock(out, nil); err != nil {
@@ -262,7 +360,7 @@ func (s *Service) report(in *json.Decoder, out *json.Encoder) error {
262360

263361
// reportLatency sends a ping request to the server, measures the RTT time and
264362
// finally sends a latency update.
265-
func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
363+
func (s *Service) reportLatency(out *json.Encoder) error {
266364
// Send the current time to the ethstats server
267365
start := time.Now()
268366

@@ -276,9 +374,12 @@ func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
276374
return err
277375
}
278376
// Wait for the pong request to arrive back
279-
var pong map[string][]interface{}
280-
if err := in.Decode(&pong); err != nil || len(pong["emit"]) != 2 || pong["emit"][0].(string) != "node-pong" {
281-
return errors.New("unexpected ping reply")
377+
select {
378+
case <-s.pongCh:
379+
// Pong delivered, report the latency
380+
case <-time.After(3 * time.Second):
381+
// Ping timeout, abort
382+
return errors.New("ping timed out")
282383
}
283384
// Send back the measured latency
284385
latency := map[string][]interface{}{
@@ -297,6 +398,7 @@ func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
297398
type blockStats struct {
298399
Number *big.Int `json:"number"`
299400
Hash common.Hash `json:"hash"`
401+
Timestamp *big.Int `json:"timestamp"`
300402
Miner common.Address `json:"miner"`
301403
GasUsed *big.Int `json:"gasUsed"`
302404
GasLimit *big.Int `json:"gasLimit"`
@@ -330,9 +432,26 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
330432

331433
// reportBlock retrieves the current chain head and repors it to the stats server.
332434
func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error {
333-
// Gather the head block infos from the local blockchain
435+
// Assemble the block stats report and send it to the server
436+
stats := map[string]interface{}{
437+
"id": s.node,
438+
"block": s.assembleBlockStats(block),
439+
}
440+
report := map[string][]interface{}{
441+
"emit": []interface{}{"block", stats},
442+
}
443+
if err := out.Encode(report); err != nil {
444+
return err
445+
}
446+
return nil
447+
}
448+
449+
// assembleBlockStats retrieves any required metadata to report a single block
450+
// and assembles the block stats. If block is nil, the current head is processed.
451+
func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
452+
// Gather the block infos from the local blockchain
334453
var (
335-
head *types.Header
454+
header *types.Header
336455
td *big.Int
337456
txs []*types.Transaction
338457
uncles []*types.Header
@@ -342,37 +461,77 @@ func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error {
342461
if block == nil {
343462
block = s.eth.BlockChain().CurrentBlock()
344463
}
345-
head = block.Header()
346-
td = s.eth.BlockChain().GetTd(head.Hash(), head.Number.Uint64())
464+
header = block.Header()
465+
td = s.eth.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
347466

348467
txs = block.Transactions()
349468
uncles = block.Uncles()
350469
} else {
351470
// Light nodes would need on-demand lookups for transactions/uncles, skip
352471
if block != nil {
353-
head = block.Header()
472+
header = block.Header()
473+
} else {
474+
header = s.les.BlockChain().CurrentHeader()
475+
}
476+
td = s.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
477+
}
478+
// Assemble and return the block stats
479+
return &blockStats{
480+
Number: header.Number,
481+
Hash: header.Hash(),
482+
Timestamp: header.Time,
483+
Miner: header.Coinbase,
484+
GasUsed: new(big.Int).Set(header.GasUsed),
485+
GasLimit: new(big.Int).Set(header.GasLimit),
486+
Diff: header.Difficulty.String(),
487+
TotalDiff: td.String(),
488+
Txs: txs,
489+
Uncles: uncles,
490+
}
491+
}
492+
493+
// reportHistory retrieves the most recent batch of blocks and reports it to the
494+
// stats server.
495+
func (s *Service) reportHistory(out *json.Encoder, list []uint64) error {
496+
// Figure out the indexes that need reporting
497+
indexes := make([]uint64, 0, historyUpdateRange)
498+
if len(list) > 0 {
499+
// Specific indexes requested, send them back in particular
500+
for _, idx := range list {
501+
indexes = append(indexes, idx)
502+
}
503+
} else {
504+
// No indexes requested, send back the top ones
505+
var head *types.Header
506+
if s.eth != nil {
507+
head = s.eth.BlockChain().CurrentHeader()
354508
} else {
355509
head = s.les.BlockChain().CurrentHeader()
356510
}
357-
td = s.les.BlockChain().GetTd(head.Hash(), head.Number.Uint64())
511+
start := head.Number.Int64() - historyUpdateRange
512+
if start < 0 {
513+
start = 0
514+
}
515+
for i := uint64(start); i <= head.Number.Uint64(); i++ {
516+
indexes = append(indexes, i)
517+
}
358518
}
359-
// Assemble the block stats report and send it to the server
519+
// Gather the batch of blocks to report
520+
history := make([]*blockStats, len(indexes))
521+
for i, number := range indexes {
522+
if s.eth != nil {
523+
history[i] = s.assembleBlockStats(s.eth.BlockChain().GetBlockByNumber(number))
524+
} else {
525+
history[i] = s.assembleBlockStats(types.NewBlockWithHeader(s.les.BlockChain().GetHeaderByNumber(number)))
526+
}
527+
}
528+
// Assemble the history report and send it to the server
360529
stats := map[string]interface{}{
361-
"id": s.node,
362-
"block": &blockStats{
363-
Number: head.Number,
364-
Hash: head.Hash(),
365-
Miner: head.Coinbase,
366-
GasUsed: new(big.Int).Set(head.GasUsed),
367-
GasLimit: new(big.Int).Set(head.GasLimit),
368-
Diff: head.Difficulty.String(),
369-
TotalDiff: td.String(),
370-
Txs: txs,
371-
Uncles: uncles,
372-
},
530+
"id": s.node,
531+
"history": history,
373532
}
374533
report := map[string][]interface{}{
375-
"emit": []interface{}{"block", stats},
534+
"emit": []interface{}{"history", stats},
376535
}
377536
if err := out.Encode(report); err != nil {
378537
return err

0 commit comments

Comments
 (0)