@@ -20,127 +20,52 @@ package p2p
20
20
21
21
import (
22
22
"net"
23
- "sync"
24
- "sync/atomic"
25
- "time"
26
23
27
- "github.com/ethereum/go-ethereum/event"
28
- "github.com/ethereum/go-ethereum/log"
29
24
"github.com/ethereum/go-ethereum/metrics"
30
25
)
31
26
32
27
const (
33
- MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter
34
- MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter
35
- MetricsOutboundConnects = "p2p/dials" // Name for the registered outbound connects meter
36
- MetricsInboundConnects = "p2p/serves" // Name for the registered inbound connects meter
37
-
38
- MeteredPeerLimit = 1024 // This amount of peers are individually metered
28
+ ingressMeterName = "p2p/ingress"
29
+ egressMeterName = "p2p/egress"
39
30
)
40
31
41
32
var (
42
- ingressConnectMeter = metrics .NewRegisteredMeter (MetricsInboundConnects , nil ) // Meter counting the ingress connections
43
- ingressTrafficMeter = metrics .NewRegisteredMeter (MetricsInboundTraffic , nil ) // Meter metering the cumulative ingress traffic
44
- egressConnectMeter = metrics .NewRegisteredMeter (MetricsOutboundConnects , nil ) // Meter counting the egress connections
45
- egressTrafficMeter = metrics .NewRegisteredMeter (MetricsOutboundTraffic , nil ) // Meter metering the cumulative egress traffic
46
- activePeerGauge = metrics .NewRegisteredGauge ("p2p/peers" , nil ) // Gauge tracking the current peer count
47
-
48
- PeerIngressRegistry = metrics .NewPrefixedChildRegistry (metrics .EphemeralRegistry , MetricsInboundTraffic + "/" ) // Registry containing the peer ingress
49
- PeerEgressRegistry = metrics .NewPrefixedChildRegistry (metrics .EphemeralRegistry , MetricsOutboundTraffic + "/" ) // Registry containing the peer egress
50
-
51
- meteredPeerFeed event.Feed // Event feed for peer metrics
52
- meteredPeerCount int32 // Actually stored peer connection count
53
- )
54
-
55
- // MeteredPeerEventType is the type of peer events emitted by a metered connection.
56
- type MeteredPeerEventType int
57
-
58
- const (
59
- // PeerHandshakeSucceeded is the type of event
60
- // emitted when a peer successfully makes the handshake.
61
- PeerHandshakeSucceeded MeteredPeerEventType = iota
62
-
63
- // PeerHandshakeFailed is the type of event emitted when a peer fails to
64
- // make the handshake or disconnects before it.
65
- PeerHandshakeFailed
66
-
67
- // PeerDisconnected is the type of event emitted when a peer disconnects.
68
- PeerDisconnected
33
+ ingressConnectMeter = metrics .NewRegisteredMeter ("p2p/serves" , nil )
34
+ ingressTrafficMeter = metrics .NewRegisteredMeter (ingressMeterName , nil )
35
+ egressConnectMeter = metrics .NewRegisteredMeter ("p2p/dials" , nil )
36
+ egressTrafficMeter = metrics .NewRegisteredMeter (egressMeterName , nil )
37
+ activePeerGauge = metrics .NewRegisteredGauge ("p2p/peers" , nil )
69
38
)
70
39
71
- // MeteredPeerEvent is an event emitted when peers connect or disconnect.
72
- type MeteredPeerEvent struct {
73
- Type MeteredPeerEventType // Type of peer event
74
- Addr string // TCP address of the peer
75
- Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection
76
- Peer * Peer // Connected remote node instance
77
- Ingress uint64 // Ingress count at the moment of the event
78
- Egress uint64 // Egress count at the moment of the event
79
- }
80
-
81
- // SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
82
- // if metrics collection is enabled.
83
- func SubscribeMeteredPeerEvent (ch chan <- MeteredPeerEvent ) event.Subscription {
84
- return meteredPeerFeed .Subscribe (ch )
85
- }
86
-
87
40
// meteredConn is a wrapper around a net.Conn that meters both the
88
41
// inbound and outbound network traffic.
89
42
type meteredConn struct {
90
- net.Conn // Network connection to wrap with metering
91
-
92
- connected time.Time // Connection time of the peer
93
- addr * net.TCPAddr // TCP address of the peer
94
- peer * Peer // Peer instance
95
-
96
- // trafficMetered denotes if the peer is registered in the traffic registries.
97
- // Its value is true if the metered peer count doesn't reach the limit in the
98
- // moment of the peer's connection.
99
- trafficMetered bool
100
- ingressMeter metrics.Meter // Meter for the read bytes of the peer
101
- egressMeter metrics.Meter // Meter for the written bytes of the peer
102
-
103
- lock sync.RWMutex // Lock protecting the metered connection's internals
43
+ net.Conn
104
44
}
105
45
106
46
// newMeteredConn creates a new metered connection, bumps the ingress or egress
107
47
// connection meter and also increases the metered peer count. If the metrics
108
- // system is disabled or the IP address is unspecified, this function returns
109
- // the original object.
48
+ // system is disabled, function returns the original connection.
110
49
func newMeteredConn (conn net.Conn , ingress bool , addr * net.TCPAddr ) net.Conn {
111
50
// Short circuit if metrics are disabled
112
51
if ! metrics .Enabled {
113
52
return conn
114
53
}
115
- if addr == nil || addr .IP .IsUnspecified () {
116
- log .Warn ("Peer address is unspecified" )
117
- return conn
118
- }
119
54
// Bump the connection counters and wrap the connection
120
55
if ingress {
121
56
ingressConnectMeter .Mark (1 )
122
57
} else {
123
58
egressConnectMeter .Mark (1 )
124
59
}
125
60
activePeerGauge .Inc (1 )
126
-
127
- return & meteredConn {
128
- Conn : conn ,
129
- addr : addr ,
130
- connected : time .Now (),
131
- }
61
+ return & meteredConn {Conn : conn }
132
62
}
133
63
134
64
// Read delegates a network read to the underlying connection, bumping the common
135
65
// and the peer ingress traffic meters along the way.
136
66
func (c * meteredConn ) Read (b []byte ) (n int , err error ) {
137
67
n , err = c .Conn .Read (b )
138
68
ingressTrafficMeter .Mark (int64 (n ))
139
- c .lock .RLock ()
140
- if c .trafficMetered {
141
- c .ingressMeter .Mark (int64 (n ))
142
- }
143
- c .lock .RUnlock ()
144
69
return n , err
145
70
}
146
71
@@ -149,84 +74,15 @@ func (c *meteredConn) Read(b []byte) (n int, err error) {
149
74
func (c * meteredConn ) Write (b []byte ) (n int , err error ) {
150
75
n , err = c .Conn .Write (b )
151
76
egressTrafficMeter .Mark (int64 (n ))
152
- c .lock .RLock ()
153
- if c .trafficMetered {
154
- c .egressMeter .Mark (int64 (n ))
155
- }
156
- c .lock .RUnlock ()
157
77
return n , err
158
78
}
159
79
160
- // handshakeDone is called after the connection passes the handshake.
161
- func (c * meteredConn ) handshakeDone (peer * Peer ) {
162
- if atomic .AddInt32 (& meteredPeerCount , 1 ) >= MeteredPeerLimit {
163
- // Don't register the peer in the traffic registries.
164
- atomic .AddInt32 (& meteredPeerCount , - 1 )
165
- c .lock .Lock ()
166
- c .peer , c .trafficMetered = peer , false
167
- c .lock .Unlock ()
168
- log .Warn ("Metered peer count reached the limit" )
169
- } else {
170
- enode := peer .Node ().String ()
171
- c .lock .Lock ()
172
- c .peer , c .trafficMetered = peer , true
173
- c .ingressMeter = metrics .NewRegisteredMeter (enode , PeerIngressRegistry )
174
- c .egressMeter = metrics .NewRegisteredMeter (enode , PeerEgressRegistry )
175
- c .lock .Unlock ()
176
- }
177
- meteredPeerFeed .Send (MeteredPeerEvent {
178
- Type : PeerHandshakeSucceeded ,
179
- Addr : c .addr .String (),
180
- Peer : peer ,
181
- Elapsed : time .Since (c .connected ),
182
- })
183
- }
184
-
185
80
// Close delegates a close operation to the underlying connection, unregisters
186
81
// the peer from the traffic registries and emits close event.
187
82
func (c * meteredConn ) Close () error {
188
83
err := c .Conn .Close ()
189
- c .lock .RLock ()
190
- if c .peer == nil {
191
- // If the peer disconnects before/during the handshake.
192
- c .lock .RUnlock ()
193
- meteredPeerFeed .Send (MeteredPeerEvent {
194
- Type : PeerHandshakeFailed ,
195
- Addr : c .addr .String (),
196
- Elapsed : time .Since (c .connected ),
197
- })
198
- activePeerGauge .Dec (1 )
199
- return err
200
- }
201
- peer := c .peer
202
- if ! c .trafficMetered {
203
- // If the peer isn't registered in the traffic registries.
204
- c .lock .RUnlock ()
205
- meteredPeerFeed .Send (MeteredPeerEvent {
206
- Type : PeerDisconnected ,
207
- Addr : c .addr .String (),
208
- Peer : peer ,
209
- })
84
+ if err == nil {
210
85
activePeerGauge .Dec (1 )
211
- return err
212
86
}
213
- ingress , egress , enode := uint64 (c .ingressMeter .Count ()), uint64 (c .egressMeter .Count ()), c .peer .Node ().String ()
214
- c .lock .RUnlock ()
215
-
216
- // Decrement the metered peer count
217
- atomic .AddInt32 (& meteredPeerCount , - 1 )
218
-
219
- // Unregister the peer from the traffic registries
220
- PeerIngressRegistry .Unregister (enode )
221
- PeerEgressRegistry .Unregister (enode )
222
-
223
- meteredPeerFeed .Send (MeteredPeerEvent {
224
- Type : PeerDisconnected ,
225
- Addr : c .addr .String (),
226
- Peer : peer ,
227
- Ingress : ingress ,
228
- Egress : egress ,
229
- })
230
- activePeerGauge .Dec (1 )
231
87
return err
232
88
}
0 commit comments