Skip to content

Commit bba5fd8

Browse files
holisticodenonsense
authored andcommitted
Accounting metrics reporter (#18136)
1 parent 2714e8f commit bba5fd8

File tree

6 files changed

+305
-30
lines changed

6 files changed

+305
-30
lines changed

p2p/protocols/accounting.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,32 @@
1616

1717
package protocols
1818

19-
import "github.com/ethereum/go-ethereum/metrics"
19+
import (
20+
"time"
21+
22+
"github.com/ethereum/go-ethereum/metrics"
23+
)
2024

2125
//define some metrics
2226
var (
23-
//NOTE: these metrics just define the interfaces and are currently *NOT persisted* over sessions
2427
//All metrics are cumulative
2528

2629
//total amount of units credited
27-
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", nil)
30+
mBalanceCredit metrics.Counter
2831
//total amount of units debited
29-
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", nil)
32+
mBalanceDebit metrics.Counter
3033
//total amount of bytes credited
31-
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", nil)
34+
mBytesCredit metrics.Counter
3235
//total amount of bytes debited
33-
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", nil)
36+
mBytesDebit metrics.Counter
3437
//total amount of credited messages
35-
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", nil)
38+
mMsgCredit metrics.Counter
3639
//total amount of debited messages
37-
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", nil)
40+
mMsgDebit metrics.Counter
3841
//how many times local node had to drop remote peers
39-
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", nil)
42+
mPeerDrops metrics.Counter
4043
//how many times local node overdrafted and dropped
41-
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", nil)
44+
mSelfDrops metrics.Counter
4245
)
4346

4447
//Prices defines how prices are being passed on to the accounting instance
@@ -105,6 +108,26 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
105108
return ah
106109
}
107110

111+
//SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
112+
//this registry should be independent of any other metrics as it persists at different endpoints.
113+
//It also instantiates the given metrics and starts the persisting go-routine which
114+
//at the passed interval writes the metrics to a LevelDB
115+
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
116+
//create an empty registry
117+
registry := metrics.NewRegistry()
118+
//instantiate the metrics
119+
mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry)
120+
mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry)
121+
mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry)
122+
mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry)
123+
mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry)
124+
mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry)
125+
mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry)
126+
mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry)
127+
//create the DB and start persisting
128+
return NewAccountingMetrics(registry, reportInterval, path)
129+
}
130+
108131
//Implement Hook.Send
109132
// Send takes a peer, a size and a msg and
110133
// - calculates the cost for the local node sending a msg of size to peer using the Prices interface

p2p/protocols/accounting_simulation_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ import (
2020
"context"
2121
"flag"
2222
"fmt"
23+
"io/ioutil"
2324
"math/rand"
25+
"os"
26+
"path/filepath"
2427
"reflect"
2528
"sync"
2629
"testing"
@@ -66,6 +69,13 @@ func init() {
6669
func TestAccountingSimulation(t *testing.T) {
6770
//setup the balances objects for every node
6871
bal := newBalances(*nodes)
72+
//setup the metrics system or tests will fail trying to write metrics
73+
dir, err := ioutil.TempDir("", "account-sim")
74+
if err != nil {
75+
t.Fatal(err)
76+
}
77+
defer os.RemoveAll(dir)
78+
SetupAccountingMetrics(1*time.Second, filepath.Join(dir, "metrics.db"))
6979
//define the node.Service for this test
7080
services := adapters.Services{
7181
"accounting": func(ctx *adapters.ServiceContext) (node.Service, error) {

p2p/protocols/reporter.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright 2018 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package protocols
18+
19+
import (
20+
"encoding/binary"
21+
"time"
22+
23+
"github.com/ethereum/go-ethereum/log"
24+
"github.com/ethereum/go-ethereum/metrics"
25+
26+
"github.com/syndtr/goleveldb/leveldb"
27+
)
28+
29+
//AccountMetrics abstracts away the metrics DB and
30+
//the reporter to persist metrics
31+
type AccountingMetrics struct {
32+
reporter *reporter
33+
}
34+
35+
//Close will be called when the node is being shutdown
36+
//for a graceful cleanup
37+
func (am *AccountingMetrics) Close() {
38+
close(am.reporter.quit)
39+
am.reporter.db.Close()
40+
}
41+
42+
//reporter is an internal structure used to write p2p accounting related
43+
//metrics to a LevelDB. It will periodically write the accrued metrics to the DB.
44+
type reporter struct {
45+
reg metrics.Registry //the registry for these metrics (independent of other metrics)
46+
interval time.Duration //duration at which the reporter will persist metrics
47+
db *leveldb.DB //the actual DB
48+
quit chan struct{} //quit the reporter loop
49+
}
50+
51+
//NewMetricsDB creates a new LevelDB instance used to persist metrics defined
52+
//inside p2p/protocols/accounting.go
53+
func NewAccountingMetrics(r metrics.Registry, d time.Duration, path string) *AccountingMetrics {
54+
var val = make([]byte, 8)
55+
var err error
56+
57+
//Create the LevelDB
58+
db, err := leveldb.OpenFile(path, nil)
59+
if err != nil {
60+
log.Error(err.Error())
61+
return nil
62+
}
63+
64+
//Check for all defined metrics that there is a value in the DB
65+
//If there is, assign it to the metric. This means that the node
66+
//has been running before and that metrics have been persisted.
67+
metricsMap := map[string]metrics.Counter{
68+
"account.balance.credit": mBalanceCredit,
69+
"account.balance.debit": mBalanceDebit,
70+
"account.bytes.credit": mBytesCredit,
71+
"account.bytes.debit": mBytesDebit,
72+
"account.msg.credit": mMsgCredit,
73+
"account.msg.debit": mMsgDebit,
74+
"account.peerdrops": mPeerDrops,
75+
"account.selfdrops": mSelfDrops,
76+
}
77+
//iterate the map and get the values
78+
for key, metric := range metricsMap {
79+
val, err = db.Get([]byte(key), nil)
80+
//until the first time a value is being written,
81+
//this will return an error.
82+
//it could be beneficial though to log errors later,
83+
//but that would require a different logic
84+
if err == nil {
85+
metric.Inc(int64(binary.BigEndian.Uint64(val)))
86+
}
87+
}
88+
89+
//create the reporter
90+
rep := &reporter{
91+
reg: r,
92+
interval: d,
93+
db: db,
94+
quit: make(chan struct{}),
95+
}
96+
97+
//run the go routine
98+
go rep.run()
99+
100+
m := &AccountingMetrics{
101+
reporter: rep,
102+
}
103+
104+
return m
105+
}
106+
107+
//run is the goroutine which periodically sends the metrics to the configured LevelDB
108+
func (r *reporter) run() {
109+
intervalTicker := time.NewTicker(r.interval)
110+
111+
for {
112+
select {
113+
case <-intervalTicker.C:
114+
//at each tick send the metrics
115+
if err := r.save(); err != nil {
116+
log.Error("unable to send metrics to LevelDB", "err", err)
117+
//If there is an error in writing, exit the routine; we assume here that the error is
118+
//severe and don't attempt to write again.
119+
//Also, this should prevent leaking when the node is stopped
120+
return
121+
}
122+
case <-r.quit:
123+
//graceful shutdown
124+
return
125+
}
126+
}
127+
}
128+
129+
//send the metrics to the DB
130+
func (r *reporter) save() error {
131+
//create a LevelDB Batch
132+
batch := leveldb.Batch{}
133+
//for each metric in the registry (which is independent)...
134+
r.reg.Each(func(name string, i interface{}) {
135+
metric, ok := i.(metrics.Counter)
136+
if ok {
137+
//assuming every metric here to be a Counter (separate registry)
138+
//...create a snapshot...
139+
ms := metric.Snapshot()
140+
byteVal := make([]byte, 8)
141+
binary.BigEndian.PutUint64(byteVal, uint64(ms.Count()))
142+
//...and save the value to the DB
143+
batch.Put([]byte(name), byteVal)
144+
}
145+
})
146+
return r.db.Write(&batch, nil)
147+
}

p2p/protocols/reporter_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright 2018 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package protocols
18+
19+
import (
20+
"io/ioutil"
21+
"os"
22+
"path/filepath"
23+
"testing"
24+
"time"
25+
26+
"github.com/ethereum/go-ethereum/log"
27+
)
28+
29+
//TestReporter tests that the metrics being collected for p2p accounting
30+
//are being persisted and available after restart of a node.
31+
//It simulates restarting by just recreating the DB as if the node had restarted.
32+
func TestReporter(t *testing.T) {
33+
//create a test directory
34+
dir, err := ioutil.TempDir("", "reporter-test")
35+
if err != nil {
36+
t.Fatal(err)
37+
}
38+
defer os.RemoveAll(dir)
39+
40+
//setup the metrics
41+
log.Debug("Setting up metrics first time")
42+
reportInterval := 5 * time.Millisecond
43+
metrics := SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
44+
log.Debug("Done.")
45+
46+
//do some metrics
47+
mBalanceCredit.Inc(12)
48+
mBytesCredit.Inc(34)
49+
mMsgDebit.Inc(9)
50+
51+
//give the reporter time to write the metrics to DB
52+
time.Sleep(20 * time.Millisecond)
53+
54+
//set the metrics to nil - this effectively simulates the node having shut down...
55+
mBalanceCredit = nil
56+
mBytesCredit = nil
57+
mMsgDebit = nil
58+
//close the DB also, or we can't create a new one
59+
metrics.Close()
60+
61+
//setup the metrics again
62+
log.Debug("Setting up metrics second time")
63+
metrics = SetupAccountingMetrics(reportInterval, filepath.Join(dir, "test.db"))
64+
defer metrics.Close()
65+
log.Debug("Done.")
66+
67+
//now check the metrics, they should have the same value as before "shutdown"
68+
if mBalanceCredit.Count() != 12 {
69+
t.Fatalf("Expected counter to be %d, but is %d", 12, mBalanceCredit.Count())
70+
}
71+
if mBytesCredit.Count() != 34 {
72+
t.Fatalf("Expected counter to be %d, but is %d", 23, mBytesCredit.Count())
73+
}
74+
if mMsgDebit.Count() != 9 {
75+
t.Fatalf("Expected counter to be %d, but is %d", 9, mMsgDebit.Count())
76+
}
77+
}

swarm/swap/swap.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,8 @@ func (s *Swap) loadState(peer *protocols.Peer) (err error) {
9191
}
9292
return
9393
}
94+
95+
//Clean up Swap
96+
func (swap *Swap) Close() {
97+
swap.stateStore.Close()
98+
}

0 commit comments

Comments
 (0)