Skip to content

Commit b2c226c

Browse files
committed
ethstats: implement block history retrievals
1 parent 13614f4 commit b2c226c

File tree

1 file changed

+190
-38
lines changed

1 file changed

+190
-38
lines changed

ethstats/ethstats.go

Lines changed: 190 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ import (
4242
"golang.org/x/net/websocket"
4343
)
4444

45+
// historyUpdateRange is the number of blocks a node should report upon login or
46+
// history request.
47+
const historyUpdateRange = 50
48+
4549
// Service implements an Ethereum netstats reporting daemon that pushes local
4650
// chain statistics up to a monitoring server.
4751
type Service struct {
@@ -54,6 +58,9 @@ type Service struct {
5458
node string // Name of the node to display on the monitoring page
5559
pass string // Password to authorize access to the monitoring page
5660
host string // Remote address of the monitoring service
61+
62+
pongCh chan struct{} // Pong notifications are fed into this channel
63+
histCh chan []uint64 // History request block numbers are fed into this channel
5764
}
5865

5966
// New returns a monitoring service ready for stats reporting.
@@ -66,11 +73,13 @@ func New(url string, ethServ *eth.Ethereum, lesServ *les.LightEthereum) (*Servic
6673
}
6774
// Assemble and return the stats service
6875
return &Service{
69-
eth: ethServ,
70-
les: lesServ,
71-
node: parts[1],
72-
pass: parts[3],
73-
host: parts[4],
76+
eth: ethServ,
77+
les: lesServ,
78+
node: parts[1],
79+
pass: parts[3],
80+
host: parts[4],
81+
pongCh: make(chan struct{}),
82+
histCh: make(chan []uint64, 1),
7483
}, nil
7584
}
7685

@@ -135,22 +144,34 @@ func (s *Service) loop() {
135144
time.Sleep(10 * time.Second)
136145
continue
137146
}
138-
if err = s.report(in, out); err != nil {
147+
go s.readLoop(conn, in)
148+
149+
// Send the initial stats so our node looks decent from the get go
150+
if err = s.report(out); err != nil {
139151
glog.V(logger.Warn).Infof("Initial stats report failed: %v", err)
140152
conn.Close()
141153
continue
142154
}
155+
if err = s.reportHistory(out, nil); err != nil {
156+
glog.V(logger.Warn).Infof("History report failed: %v", err)
157+
conn.Close()
158+
continue
159+
}
143160
// Keep sending status updates until the connection breaks
144161
fullReport := time.NewTicker(15 * time.Second)
145162

146163
for err == nil {
147164
select {
148165
case <-fullReport.C:
149-
if err = s.report(in, out); err != nil {
166+
if err = s.report(out); err != nil {
150167
glog.V(logger.Warn).Infof("Full stats report failed: %v", err)
151168
}
152-
case head := <-headSub.Chan():
153-
if head == nil { // node stopped
169+
case list := <-s.histCh:
170+
if err = s.reportHistory(out, list); err != nil {
171+
glog.V(logger.Warn).Infof("Block history report failed: %v", err)
172+
}
173+
case head, ok := <-headSub.Chan():
174+
if !ok { // node stopped
154175
conn.Close()
155176
return
156177
}
@@ -160,8 +181,8 @@ func (s *Service) loop() {
160181
if err = s.reportPending(out); err != nil {
161182
glog.V(logger.Warn).Infof("Post-block transaction stats report failed: %v", err)
162183
}
163-
case ev := <-txSub.Chan():
164-
if ev == nil { // node stopped
184+
case _, ok := <-txSub.Chan():
185+
if !ok { // node stopped
165186
conn.Close()
166187
return
167188
}
@@ -183,6 +204,76 @@ func (s *Service) loop() {
183204
}
184205
}
185206

207+
// readLoop loops as long as the connection is alive and retrieves data packets
208+
// from the network socket. If any of them match an active request, it forwards
209+
// it, if they themselves are requests it initiates a reply, and lastly it drops
210+
// unknown packets.
211+
func (s *Service) readLoop(conn *websocket.Conn, in *json.Decoder) {
212+
// If the read loop exists, close the connection
213+
defer conn.Close()
214+
215+
for {
216+
// Retrieve the next generic network packet and bail out on error
217+
var msg map[string][]interface{}
218+
if err := in.Decode(&msg); err != nil {
219+
glog.V(logger.Warn).Infof("Failed to decode stats server message: %v", err)
220+
return
221+
}
222+
if len(msg["emit"]) == 0 {
223+
glog.V(logger.Warn).Infof("Stats server sent non-broadcast: %v", msg)
224+
return
225+
}
226+
command, ok := msg["emit"][0].(string)
227+
if !ok {
228+
glog.V(logger.Warn).Infof("Invalid stats server message type: %v", msg["emit"][0])
229+
return
230+
}
231+
// If the message is a ping reply, deliver (someone must be listening!)
232+
if len(msg["emit"]) == 2 && command == "node-pong" {
233+
select {
234+
case s.pongCh <- struct{}{}:
235+
// Pong delivered, continue listening
236+
continue
237+
default:
238+
// Ping routine dead, abort
239+
glog.V(logger.Warn).Infof("Stats server pinger seems to have died")
240+
return
241+
}
242+
}
243+
// If the message is a history request, forward to the event processor
244+
if len(msg["emit"]) == 2 && command == "history" {
245+
// Make sure the request is valid and doesn't crash us
246+
request, ok := msg["emit"][1].(map[string]interface{})
247+
if !ok {
248+
glog.V(logger.Warn).Infof("Invalid history request: %v", msg["emit"][1])
249+
return
250+
}
251+
list, ok := request["list"].([]interface{})
252+
if !ok {
253+
glog.V(logger.Warn).Infof("Invalid history block list: %v", request["list"])
254+
return
255+
}
256+
// Convert the block number list to an integer list
257+
numbers := make([]uint64, len(list))
258+
for i, num := range list {
259+
n, ok := num.(float64)
260+
if !ok {
261+
glog.V(logger.Warn).Infof("Invalid history block number: %v", num)
262+
return
263+
}
264+
numbers[i] = uint64(n)
265+
}
266+
select {
267+
case s.histCh <- numbers:
268+
continue
269+
default:
270+
}
271+
}
272+
// Report anything else and continue
273+
glog.V(logger.Info).Infof("Unknown stats message: %v", msg)
274+
}
275+
}
276+
186277
// nodeInfo is the collection of metainformation about a node that is displayed
187278
// on the monitoring page.
188279
type nodeInfo struct {
@@ -195,6 +286,7 @@ type nodeInfo struct {
195286
Os string `json:"os"`
196287
OsVer string `json:"os_v"`
197288
Client string `json:"client"`
289+
History bool `json:"canUpdateHistory"`
198290
}
199291

200292
// authMsg is the authentication infos needed to login to a monitoring server.
@@ -229,6 +321,7 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
229321
Os: runtime.GOOS,
230322
OsVer: runtime.GOARCH,
231323
Client: "0.1.1",
324+
History: true,
232325
},
233326
Secret: s.pass,
234327
}
@@ -249,8 +342,8 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error {
249342
// report collects all possible data to report and send it to the stats server.
250343
// This should only be used on reconnects or rarely to avoid overloading the
251344
// server. Use the individual methods for reporting subscribed events.
252-
func (s *Service) report(in *json.Decoder, out *json.Encoder) error {
253-
if err := s.reportLatency(in, out); err != nil {
345+
func (s *Service) report(out *json.Encoder) error {
346+
if err := s.reportLatency(out); err != nil {
254347
return err
255348
}
256349
if err := s.reportBlock(out, nil); err != nil {
@@ -267,7 +360,7 @@ func (s *Service) report(in *json.Decoder, out *json.Encoder) error {
267360

268361
// reportLatency sends a ping request to the server, measures the RTT time and
269362
// finally sends a latency update.
270-
func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
363+
func (s *Service) reportLatency(out *json.Encoder) error {
271364
// Send the current time to the ethstats server
272365
start := time.Now()
273366

@@ -281,9 +374,12 @@ func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error {
281374
return err
282375
}
283376
// Wait for the pong request to arrive back
284-
var pong map[string][]interface{}
285-
if err := in.Decode(&pong); err != nil || len(pong["emit"]) != 2 || pong["emit"][0].(string) != "node-pong" {
286-
return errors.New("unexpected ping reply")
377+
select {
378+
case <-s.pongCh:
379+
// Pong delivered, report the latency
380+
case <-time.After(3 * time.Second):
381+
// Ping timeout, abort
382+
return errors.New("ping timed out")
287383
}
288384
// Send back the measured latency
289385
latency := map[string][]interface{}{
@@ -336,9 +432,26 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
336432

337433
// reportBlock retrieves the current chain head and repors it to the stats server.
338434
func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error {
339-
// Gather the head block infos from the local blockchain
435+
// Assemble the block stats report and send it to the server
436+
stats := map[string]interface{}{
437+
"id": s.node,
438+
"block": s.assembleBlockStats(block),
439+
}
440+
report := map[string][]interface{}{
441+
"emit": []interface{}{"block", stats},
442+
}
443+
if err := out.Encode(report); err != nil {
444+
return err
445+
}
446+
return nil
447+
}
448+
449+
// assembleBlockStats retrieves any required metadata to report a single block
450+
// and assembles the block stats. If block is nil, the current head is processed.
451+
func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
452+
// Gather the block infos from the local blockchain
340453
var (
341-
head *types.Header
454+
header *types.Header
342455
td *big.Int
343456
txs []*types.Transaction
344457
uncles []*types.Header
@@ -348,38 +461,77 @@ func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error {
348461
if block == nil {
349462
block = s.eth.BlockChain().CurrentBlock()
350463
}
351-
head = block.Header()
352-
td = s.eth.BlockChain().GetTd(head.Hash(), head.Number.Uint64())
464+
header = block.Header()
465+
td = s.eth.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
353466

354467
txs = block.Transactions()
355468
uncles = block.Uncles()
356469
} else {
357470
// Light nodes would need on-demand lookups for transactions/uncles, skip
358471
if block != nil {
359-
head = block.Header()
472+
header = block.Header()
473+
} else {
474+
header = s.les.BlockChain().CurrentHeader()
475+
}
476+
td = s.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
477+
}
478+
// Assemble and return the block stats
479+
return &blockStats{
480+
Number: header.Number,
481+
Hash: header.Hash(),
482+
Timestamp: header.Time,
483+
Miner: header.Coinbase,
484+
GasUsed: new(big.Int).Set(header.GasUsed),
485+
GasLimit: new(big.Int).Set(header.GasLimit),
486+
Diff: header.Difficulty.String(),
487+
TotalDiff: td.String(),
488+
Txs: txs,
489+
Uncles: uncles,
490+
}
491+
}
492+
493+
// reportHistory retrieves the most recent batch of blocks and reports it to the
494+
// stats server.
495+
func (s *Service) reportHistory(out *json.Encoder, list []uint64) error {
496+
// Figure out the indexes that need reporting
497+
indexes := make([]uint64, 0, historyUpdateRange)
498+
if len(list) > 0 {
499+
// Specific indexes requested, send them back in particular
500+
for _, idx := range list {
501+
indexes = append(indexes, idx)
502+
}
503+
} else {
504+
// No indexes requested, send back the top ones
505+
var head *types.Header
506+
if s.eth != nil {
507+
head = s.eth.BlockChain().CurrentHeader()
360508
} else {
361509
head = s.les.BlockChain().CurrentHeader()
362510
}
363-
td = s.les.BlockChain().GetTd(head.Hash(), head.Number.Uint64())
511+
start := head.Number.Int64() - historyUpdateRange
512+
if start < 0 {
513+
start = 0
514+
}
515+
for i := uint64(start); i <= head.Number.Uint64(); i++ {
516+
indexes = append(indexes, i)
517+
}
364518
}
365-
// Assemble the block stats report and send it to the server
519+
// Gather the batch of blocks to report
520+
history := make([]*blockStats, len(indexes))
521+
for i, number := range indexes {
522+
if s.eth != nil {
523+
history[i] = s.assembleBlockStats(s.eth.BlockChain().GetBlockByNumber(number))
524+
} else {
525+
history[i] = s.assembleBlockStats(types.NewBlockWithHeader(s.les.BlockChain().GetHeaderByNumber(number)))
526+
}
527+
}
528+
// Assemble the history report and send it to the server
366529
stats := map[string]interface{}{
367-
"id": s.node,
368-
"block": &blockStats{
369-
Number: head.Number,
370-
Hash: head.Hash(),
371-
Timestamp: head.Time,
372-
Miner: head.Coinbase,
373-
GasUsed: new(big.Int).Set(head.GasUsed),
374-
GasLimit: new(big.Int).Set(head.GasLimit),
375-
Diff: head.Difficulty.String(),
376-
TotalDiff: td.String(),
377-
Txs: txs,
378-
Uncles: uncles,
379-
},
530+
"id": s.node,
531+
"history": history,
380532
}
381533
report := map[string][]interface{}{
382-
"emit": []interface{}{"block", stats},
534+
"emit": []interface{}{"history", stats},
383535
}
384536
if err := out.Encode(report); err != nil {
385537
return err

0 commit comments

Comments
 (0)