Skip to content

Commit 7c67855

Browse files
committed
Merge pull request #885 from fjl/p2p-fixes
p2p: more last-minute fixes
2 parents 23454dc + d4f0a67 commit 7c67855

File tree

6 files changed

+69
-39
lines changed

6 files changed

+69
-39
lines changed

cmd/utils/flags.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ var (
195195
MaxPeersFlag = cli.IntFlag{
196196
Name: "maxpeers",
197197
Usage: "Maximum number of network peers (network disabled if set to 0)",
198-
Value: 16,
198+
Value: 25,
199199
}
200200
MaxPendingPeersFlag = cli.IntFlag{
201201
Name: "maxpendpeers",

p2p/handshake.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,26 +65,26 @@ type protoHandshake struct {
6565
ID discover.NodeID
6666
}
6767

68-
// setupConn starts a protocol session on the given connection.
69-
// It runs the encryption handshake and the protocol handshake.
70-
// If dial is non-nil, the connection the local node is the initiator.
71-
// If atcap is true, the connection will be disconnected with DiscTooManyPeers
72-
// after the key exchange.
73-
func setupConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool, trusted map[discover.NodeID]bool) (*conn, error) {
68+
// setupConn starts a protocol session on the given connection. It
69+
// runs the encryption handshake and the protocol handshake. If dial
70+
// is non-nil, the connection the local node is the initiator. If
71+
// keepconn returns false, the connection will be disconnected with
72+
// DiscTooManyPeers after the key exchange.
73+
func setupConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, keepconn func(discover.NodeID) bool) (*conn, error) {
7474
if dial == nil {
75-
return setupInboundConn(fd, prv, our, atcap, trusted)
75+
return setupInboundConn(fd, prv, our, keepconn)
7676
} else {
77-
return setupOutboundConn(fd, prv, our, dial, atcap, trusted)
77+
return setupOutboundConn(fd, prv, our, dial, keepconn)
7878
}
7979
}
8080

81-
func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, atcap bool, trusted map[discover.NodeID]bool) (*conn, error) {
81+
func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, keepconn func(discover.NodeID) bool) (*conn, error) {
8282
secrets, err := receiverEncHandshake(fd, prv, nil)
8383
if err != nil {
8484
return nil, fmt.Errorf("encryption handshake failed: %v", err)
8585
}
8686
rw := newRlpxFrameRW(fd, secrets)
87-
if atcap && !trusted[secrets.RemoteID] {
87+
if !keepconn(secrets.RemoteID) {
8888
SendItems(rw, discMsg, DiscTooManyPeers)
8989
return nil, errors.New("we have too many peers")
9090
}
@@ -99,13 +99,13 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, a
9999
return &conn{rw, rhs}, nil
100100
}
101101

102-
func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool, trusted map[discover.NodeID]bool) (*conn, error) {
102+
func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, keepconn func(discover.NodeID) bool) (*conn, error) {
103103
secrets, err := initiatorEncHandshake(fd, prv, dial.ID, nil)
104104
if err != nil {
105105
return nil, fmt.Errorf("encryption handshake failed: %v", err)
106106
}
107107
rw := newRlpxFrameRW(fd, secrets)
108-
if atcap && !trusted[secrets.RemoteID] {
108+
if !keepconn(secrets.RemoteID) {
109109
SendItems(rw, discMsg, DiscTooManyPeers)
110110
return nil, errors.New("we have too many peers")
111111
}

p2p/handshake_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,10 @@ func TestSetupConn(t *testing.T) {
141141
fd0, fd1 := net.Pipe()
142142

143143
done := make(chan struct{})
144+
keepalways := func(discover.NodeID) bool { return true }
144145
go func() {
145146
defer close(done)
146-
conn0, err := setupConn(fd0, prv0, hs0, node1, false, nil)
147+
conn0, err := setupConn(fd0, prv0, hs0, node1, keepalways)
147148
if err != nil {
148149
t.Errorf("outbound side error: %v", err)
149150
return
@@ -156,7 +157,7 @@ func TestSetupConn(t *testing.T) {
156157
}
157158
}()
158159

159-
conn1, err := setupConn(fd1, prv1, hs1, nil, false, nil)
160+
conn1, err := setupConn(fd1, prv1, hs1, nil, keepalways)
160161
if err != nil {
161162
t.Fatalf("inbound side error: %v", err)
162163
}

p2p/peer.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,18 @@ func (p *Peer) handle(msg Msg) error {
211211
return nil
212212
}
213213

214+
func countMatchingProtocols(protocols []Protocol, caps []Cap) int {
215+
n := 0
216+
for _, cap := range caps {
217+
for _, proto := range protocols {
218+
if proto.Name == cap.Name && proto.Version == cap.Version {
219+
n++
220+
}
221+
}
222+
}
223+
return n
224+
}
225+
214226
// matchProtocols creates structures for matching named subprotocols.
215227
func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW {
216228
sort.Sort(capsByName(caps))

p2p/server.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ type Server struct {
126126
peerWG sync.WaitGroup // active peer goroutines
127127
}
128128

129-
type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, bool, map[discover.NodeID]bool) (*conn, error)
129+
type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, func(discover.NodeID) bool) (*conn, error)
130130
type newPeerHook func(*Peer)
131131

132132
// Peers returns all connected peers.
@@ -412,7 +412,7 @@ func (srv *Server) dialLoop() {
412412
defer refresh.Stop()
413413

414414
// Limit the number of concurrent dials
415-
tokens := maxAcceptConns
415+
tokens := maxDialingConns
416416
if srv.MaxPendingPeers > 0 {
417417
tokens = srv.MaxPendingPeers
418418
}
@@ -506,17 +506,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
506506
// the callers of startPeer added the peer to the wait group already.
507507
fd.SetDeadline(time.Now().Add(handshakeTimeout))
508508

509-
// Check capacity, but override for static nodes
510-
srv.lock.RLock()
511-
atcap := len(srv.peers) == srv.MaxPeers
512-
if dest != nil {
513-
if _, ok := srv.staticNodes[dest.ID]; ok {
514-
atcap = false
515-
}
516-
}
517-
srv.lock.RUnlock()
518-
519-
conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, atcap, srv.trustedNodes)
509+
conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, srv.keepconn)
520510
if err != nil {
521511
fd.Close()
522512
glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
@@ -528,7 +518,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
528518
conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
529519
}
530520
p := newPeer(fd, conn, srv.Protocols)
531-
if ok, reason := srv.addPeer(conn.ID, p); !ok {
521+
if ok, reason := srv.addPeer(conn, p); !ok {
532522
glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason)
533523
p.politeDisconnect(reason)
534524
srv.peerWG.Done()
@@ -539,6 +529,21 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
539529
go srv.runPeer(p)
540530
}
541531

532+
// preflight checks whether a connection should be kept. it runs
533+
// after the encryption handshake, as soon as the remote identity is
534+
// known.
535+
func (srv *Server) keepconn(id discover.NodeID) bool {
536+
srv.lock.RLock()
537+
defer srv.lock.RUnlock()
538+
if _, ok := srv.staticNodes[id]; ok {
539+
return true // static nodes are always allowed
540+
}
541+
if _, ok := srv.trustedNodes[id]; ok {
542+
return true // trusted nodes are always allowed
543+
}
544+
return len(srv.peers) < srv.MaxPeers
545+
}
546+
542547
func (srv *Server) runPeer(p *Peer) {
543548
glog.V(logger.Debug).Infof("Added %v\n", p)
544549
srvjslog.LogJson(&logger.P2PConnected{
@@ -559,13 +564,18 @@ func (srv *Server) runPeer(p *Peer) {
559564
})
560565
}
561566

562-
func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
567+
func (srv *Server) addPeer(conn *conn, p *Peer) (bool, DiscReason) {
568+
// drop connections with no matching protocols.
569+
if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, conn.protoHandshake.Caps) == 0 {
570+
return false, DiscUselessPeer
571+
}
572+
// add the peer if it passes the other checks.
563573
srv.lock.Lock()
564574
defer srv.lock.Unlock()
565-
if ok, reason := srv.checkPeer(id); !ok {
575+
if ok, reason := srv.checkPeer(conn.ID); !ok {
566576
return false, reason
567577
}
568-
srv.peers[id] = p
578+
srv.peers[conn.ID] = p
569579
return true, 0
570580
}
571581

p2p/server_test.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server {
2222
ListenAddr: "127.0.0.1:0",
2323
PrivateKey: newkey(),
2424
newPeerHook: pf,
25-
setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool, trusted map[discover.NodeID]bool) (*conn, error) {
25+
setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, keepconn func(discover.NodeID) bool) (*conn, error) {
2626
id := randomID()
27+
if !keepconn(id) {
28+
return nil, DiscAlreadyConnected
29+
}
2730
rw := newRlpxFrameRW(fd, secrets{
2831
MAC: zero16,
2932
AES: zero16,
@@ -200,7 +203,7 @@ func TestServerDisconnectAtCap(t *testing.T) {
200203
// Run the handshakes just like a real peer would.
201204
key := newkey()
202205
hs := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
203-
_, err = setupConn(conn, key, hs, srv.Self(), false, srv.trustedNodes)
206+
_, err = setupConn(conn, key, hs, srv.Self(), keepalways)
204207
if i == nconns-1 {
205208
// When handling the last connection, the server should
206209
// disconnect immediately instead of running the protocol
@@ -250,7 +253,7 @@ func TestServerStaticPeers(t *testing.T) {
250253
// Run the handshakes just like a real peer would, and wait for completion
251254
key := newkey()
252255
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
253-
if _, err = setupConn(conn, key, shake, server.Self(), false, server.trustedNodes); err != nil {
256+
if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil {
254257
t.Fatalf("conn %d: unexpected error: %v", i, err)
255258
}
256259
<-started
@@ -344,7 +347,7 @@ func TestServerTrustedPeers(t *testing.T) {
344347
// Run the handshakes just like a real peer would, and wait for completion
345348
key := newkey()
346349
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
347-
if _, err = setupConn(conn, key, shake, server.Self(), false, server.trustedNodes); err != nil {
350+
if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil {
348351
t.Fatalf("conn %d: unexpected error: %v", i, err)
349352
}
350353
<-started
@@ -357,7 +360,7 @@ func TestServerTrustedPeers(t *testing.T) {
357360
defer conn.Close()
358361

359362
shake := &protoHandshake{Version: baseProtocolVersion, ID: trusted.ID}
360-
if _, err = setupConn(conn, key, shake, server.Self(), false, server.trustedNodes); err != nil {
363+
if _, err = setupConn(conn, key, shake, server.Self(), keepalways); err != nil {
361364
t.Fatalf("trusted node: unexpected error: %v", err)
362365
}
363366
select {
@@ -472,7 +475,7 @@ func TestServerMaxPendingAccepts(t *testing.T) {
472475
go func() {
473476
key := newkey()
474477
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
475-
if _, err := setupConn(conns[1], key, shake, server.Self(), false, server.trustedNodes); err != nil {
478+
if _, err := setupConn(conns[1], key, shake, server.Self(), keepalways); err != nil {
476479
t.Fatalf("failed to run handshake: %v", err)
477480
}
478481
}()
@@ -486,7 +489,7 @@ func TestServerMaxPendingAccepts(t *testing.T) {
486489
go func() {
487490
key := newkey()
488491
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
489-
if _, err := setupConn(conns[0], key, shake, server.Self(), false, server.trustedNodes); err != nil {
492+
if _, err := setupConn(conns[0], key, shake, server.Self(), keepalways); err != nil {
490493
t.Fatalf("failed to run handshake: %v", err)
491494
}
492495
}()
@@ -513,3 +516,7 @@ func randomID() (id discover.NodeID) {
513516
}
514517
return id
515518
}
519+
520+
func keepalways(id discover.NodeID) bool {
521+
return true
522+
}

0 commit comments

Comments
 (0)