Skip to content

Commit a2a6086

Browse files
committed
p2p: measure subprotocol bandwidth usage
1 parent df89233 commit a2a6086

File tree

3 files changed

+19
-1
lines changed

3 files changed

+19
-1
lines changed

p2p/message.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,13 @@ import (
3939
// separate Msg with a bytes.Reader as Payload for each send.
4040
type Msg struct {
4141
Code uint64
42-
Size uint32 // size of the paylod
42+
Size uint32 // Size of the raw payload
4343
Payload io.Reader
4444
ReceivedAt time.Time
45+
46+
meterCap Cap // Protocol name and version for egress metering
47+
meterCode uint64 // Message within protocol for egress metering
48+
meterSize uint32 // Compressed message size for ingress metering
4549
}
4650

4751
// Decode parses the RLP content of a message into

p2p/peer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/ethereum/go-ethereum/common/mclock"
2929
"github.com/ethereum/go-ethereum/event"
3030
"github.com/ethereum/go-ethereum/log"
31+
"github.com/ethereum/go-ethereum/metrics"
3132
"github.com/ethereum/go-ethereum/p2p/enode"
3233
"github.com/ethereum/go-ethereum/p2p/enr"
3334
"github.com/ethereum/go-ethereum/rlp"
@@ -300,6 +301,9 @@ func (p *Peer) handle(msg Msg) error {
300301
if err != nil {
301302
return fmt.Errorf("msg code out of range: %v", msg.Code)
302303
}
304+
if metrics.Enabled {
305+
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsInboundTraffic, proto.Name, proto.Version, msg.Code-proto.offset), nil).Mark(int64(msg.meterSize))
306+
}
303307
select {
304308
case proto.in <- msg:
305309
return nil
@@ -398,7 +402,11 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) {
398402
if msg.Code >= rw.Length {
399403
return newPeerError(errInvalidMsgCode, "not handled")
400404
}
405+
msg.meterCap = rw.cap()
406+
msg.meterCode = msg.Code
407+
401408
msg.Code += rw.offset
409+
402410
select {
403411
case <-rw.wstart:
404412
err = rw.w.WriteMsg(msg)

p2p/rlpx.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/ethereum/go-ethereum/common/bitutil"
3939
"github.com/ethereum/go-ethereum/crypto"
4040
"github.com/ethereum/go-ethereum/crypto/ecies"
41+
"github.com/ethereum/go-ethereum/metrics"
4142
"github.com/ethereum/go-ethereum/rlp"
4243
"github.com/golang/snappy"
4344
"golang.org/x/crypto/sha3"
@@ -602,6 +603,10 @@ func (rw *rlpxFrameRW) WriteMsg(msg Msg) error {
602603
msg.Payload = bytes.NewReader(payload)
603604
msg.Size = uint32(len(payload))
604605
}
606+
msg.meterSize = msg.Size
607+
if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
608+
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsOutboundTraffic, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode), nil).Mark(int64(msg.meterSize))
609+
}
605610
// write header
606611
headbuf := make([]byte, 32)
607612
fsize := uint32(len(ptype)) + msg.Size
@@ -686,6 +691,7 @@ func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) {
686691
return msg, err
687692
}
688693
msg.Size = uint32(content.Len())
694+
msg.meterSize = msg.Size
689695
msg.Payload = content
690696

691697
// if snappy is enabled, verify and decompress message

0 commit comments

Comments
 (0)