Skip to content

Commit 23454dc

Browse files
committed
Merge pull request #840 from karalabe/throttled-dialing
p2p: throttled handshakes
2 parents f819ac7 + 8735e5a commit 23454dc

File tree

6 files changed

+180
-19
lines changed

6 files changed

+180
-19
lines changed

cmd/geth/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
242242
utils.JSpathFlag,
243243
utils.ListenPortFlag,
244244
utils.MaxPeersFlag,
245+
utils.MaxPendingPeersFlag,
245246
utils.EtherbaseFlag,
246247
utils.MinerThreadsFlag,
247248
utils.MiningEnabledFlag,

cmd/mist/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func init() {
7575
utils.LogFileFlag,
7676
utils.LogLevelFlag,
7777
utils.MaxPeersFlag,
78+
utils.MaxPendingPeersFlag,
7879
utils.MinerThreadsFlag,
7980
utils.NATFlag,
8081
utils.NodeKeyFileFlag,

cmd/utils/flags.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ var (
197197
Usage: "Maximum number of network peers (network disabled if set to 0)",
198198
Value: 16,
199199
}
200+
MaxPendingPeersFlag = cli.IntFlag{
201+
Name: "maxpendpeers",
202+
Usage: "Maximum number of pending connection attempts (defaults used if set to 0)",
203+
Value: 0,
204+
}
200205
ListenPortFlag = cli.IntFlag{
201206
Name: "port",
202207
Usage: "Network listening port",
@@ -292,6 +297,7 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config {
292297
AccountManager: GetAccountManager(ctx),
293298
VmDebug: ctx.GlobalBool(VMDebugFlag.Name),
294299
MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name),
300+
MaxPendingPeers: ctx.GlobalInt(MaxPendingPeersFlag.Name),
295301
Port: ctx.GlobalString(ListenPortFlag.Name),
296302
NAT: GetNAT(ctx),
297303
NatSpec: ctx.GlobalBool(NatspecEnabledFlag.Name),

eth/backend.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ type Config struct {
6060
VmDebug bool
6161
NatSpec bool
6262

63-
MaxPeers int
64-
Port string
63+
MaxPeers int
64+
MaxPendingPeers int
65+
Port string
6566

6667
// Space-separated list of discovery node URLs
6768
BootNodes string
@@ -280,16 +281,17 @@ func New(config *Config) (*Ethereum, error) {
280281
protocols = append(protocols, eth.whisper.Protocol())
281282
}
282283
eth.net = &p2p.Server{
283-
PrivateKey: netprv,
284-
Name: config.Name,
285-
MaxPeers: config.MaxPeers,
286-
Protocols: protocols,
287-
NAT: config.NAT,
288-
NoDial: !config.Dial,
289-
BootstrapNodes: config.parseBootNodes(),
290-
StaticNodes: config.parseNodes(staticNodes),
291-
TrustedNodes: config.parseNodes(trustedNodes),
292-
NodeDatabase: nodeDb,
284+
PrivateKey: netprv,
285+
Name: config.Name,
286+
MaxPeers: config.MaxPeers,
287+
MaxPendingPeers: config.MaxPendingPeers,
288+
Protocols: protocols,
289+
NAT: config.NAT,
290+
NoDial: !config.Dial,
291+
BootstrapNodes: config.parseBootNodes(),
292+
StaticNodes: config.parseNodes(staticNodes),
293+
TrustedNodes: config.parseNodes(trustedNodes),
294+
NodeDatabase: nodeDb,
293295
}
294296
if len(config.Port) > 0 {
295297
eth.net.ListenAddr = ":" + config.Port

p2p/server.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ const (
2222
refreshPeersInterval = 30 * time.Second
2323
staticPeerCheckInterval = 15 * time.Second
2424

25-
// This is the maximum number of inbound connection
26-
// that are allowed to linger between 'accepted' and
27-
// 'added as peer'.
28-
maxAcceptConns = 50
25+
// Maximum number of concurrently handshaking inbound connections.
26+
maxAcceptConns = 10
27+
28+
// Maximum number of concurrently dialing outbound connections.
29+
maxDialingConns = 10
2930

3031
// total timeout for encryption handshake and protocol
3132
// handshake in both directions.
@@ -52,6 +53,11 @@ type Server struct {
5253
// connected. It must be greater than zero.
5354
MaxPeers int
5455

56+
// MaxPendingPeers is the maximum number of peers that can be pending in the
57+
// handshake phase, counted separately for inbound and outbound connections.
58+
// Zero defaults to preset values.
59+
MaxPendingPeers int
60+
5561
// Name sets the node name of this server.
5662
// Use common.MakeName to create a name that follows existing conventions.
5763
Name string
@@ -331,8 +337,12 @@ func (srv *Server) listenLoop() {
331337
// This channel acts as a semaphore limiting
332338
// active inbound connections that are lingering pre-handshake.
333339
// If all slots are taken, no further connections are accepted.
334-
slots := make(chan struct{}, maxAcceptConns)
335-
for i := 0; i < maxAcceptConns; i++ {
340+
tokens := maxAcceptConns
341+
if srv.MaxPendingPeers > 0 {
342+
tokens = srv.MaxPendingPeers
343+
}
344+
slots := make(chan struct{}, tokens)
345+
for i := 0; i < tokens; i++ {
336346
slots <- struct{}{}
337347
}
338348

@@ -401,7 +411,15 @@ func (srv *Server) dialLoop() {
401411
defer srv.loopWG.Done()
402412
defer refresh.Stop()
403413

404-
// TODO: maybe limit number of active dials
414+
// Limit the number of concurrent dials
415+
tokens := maxAcceptConns
416+
if srv.MaxPendingPeers > 0 {
417+
tokens = srv.MaxPendingPeers
418+
}
419+
slots := make(chan struct{}, tokens)
420+
for i := 0; i < tokens; i++ {
421+
slots <- struct{}{}
422+
}
405423
dial := func(dest *discover.Node) {
406424
// Don't dial nodes that would fail the checks in addPeer.
407425
// This is important because the connection handshake is a lot
@@ -413,11 +431,14 @@ func (srv *Server) dialLoop() {
413431
if !ok || dialing[dest.ID] {
414432
return
415433
}
434+
// Request a dial slot to prevent CPU exhaustion
435+
<-slots
416436

417437
dialing[dest.ID] = true
418438
srv.peerWG.Add(1)
419439
go func() {
420440
srv.dialNode(dest)
441+
slots <- struct{}{}
421442
dialed <- dest
422443
}()
423444
}

p2p/server_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,136 @@ func TestServerTrustedPeers(t *testing.T) {
369369
}
370370
}
371371

372+
// Tests that a failed dial will temporarily throttle a peer.
373+
func TestServerMaxPendingDials(t *testing.T) {
374+
defer testlog(t).detach()
375+
376+
// Start a simple test server
377+
server := &Server{
378+
ListenAddr: "127.0.0.1:0",
379+
PrivateKey: newkey(),
380+
MaxPeers: 10,
381+
MaxPendingPeers: 1,
382+
}
383+
if err := server.Start(); err != nil {
384+
t.Fatal("failed to start test server: %v", err)
385+
}
386+
defer server.Stop()
387+
388+
// Simulate two separate remote peers
389+
peers := make(chan *discover.Node, 2)
390+
conns := make(chan net.Conn, 2)
391+
for i := 0; i < 2; i++ {
392+
listener, err := net.Listen("tcp", "127.0.0.1:0")
393+
if err != nil {
394+
t.Fatalf("listener %d: failed to setup: %v", i, err)
395+
}
396+
defer listener.Close()
397+
398+
addr := listener.Addr().(*net.TCPAddr)
399+
peers <- &discover.Node{
400+
ID: discover.PubkeyID(&newkey().PublicKey),
401+
IP: addr.IP,
402+
TCP: uint16(addr.Port),
403+
}
404+
go func() {
405+
conn, err := listener.Accept()
406+
if err == nil {
407+
conns <- conn
408+
}
409+
}()
410+
}
411+
// Request a dial for both peers
412+
go func() {
413+
for i := 0; i < 2; i++ {
414+
server.staticDial <- <-peers // hack piggybacking the static implementation
415+
}
416+
}()
417+
418+
// Make sure only one outbound connection goes through
419+
var conn net.Conn
420+
421+
select {
422+
case conn = <-conns:
423+
case <-time.After(100 * time.Millisecond):
424+
t.Fatalf("first dial timeout")
425+
}
426+
select {
427+
case conn = <-conns:
428+
t.Fatalf("second dial completed prematurely")
429+
case <-time.After(100 * time.Millisecond):
430+
}
431+
// Finish the first dial, check the second
432+
conn.Close()
433+
select {
434+
case conn = <-conns:
435+
conn.Close()
436+
437+
case <-time.After(100 * time.Millisecond):
438+
t.Fatalf("second dial timeout")
439+
}
440+
}
441+
442+
func TestServerMaxPendingAccepts(t *testing.T) {
443+
defer testlog(t).detach()
444+
445+
// Start a test server and a peer sink for synchronization
446+
started := make(chan *Peer)
447+
server := &Server{
448+
ListenAddr: "127.0.0.1:0",
449+
PrivateKey: newkey(),
450+
MaxPeers: 10,
451+
MaxPendingPeers: 1,
452+
NoDial: true,
453+
newPeerHook: func(p *Peer) { started <- p },
454+
}
455+
if err := server.Start(); err != nil {
456+
t.Fatal("failed to start test server: %v", err)
457+
}
458+
defer server.Stop()
459+
460+
// Try and connect to the server on multiple threads concurrently
461+
conns := make([]net.Conn, 2)
462+
for i := 0; i < 2; i++ {
463+
dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)}
464+
465+
conn, err := dialer.Dial("tcp", server.ListenAddr)
466+
if err != nil {
467+
t.Fatalf("failed to dial server: %v", err)
468+
}
469+
conns[i] = conn
470+
}
471+
// Check that a handshake on the second doesn't pass
472+
go func() {
473+
key := newkey()
474+
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
475+
if _, err := setupConn(conns[1], key, shake, server.Self(), false, server.trustedNodes); err != nil {
476+
t.Fatalf("failed to run handshake: %v", err)
477+
}
478+
}()
479+
select {
480+
case <-started:
481+
t.Fatalf("handshake on second connection accepted")
482+
483+
case <-time.After(time.Second):
484+
}
485+
// Shake on first, check that both go through
486+
go func() {
487+
key := newkey()
488+
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
489+
if _, err := setupConn(conns[0], key, shake, server.Self(), false, server.trustedNodes); err != nil {
490+
t.Fatalf("failed to run handshake: %v", err)
491+
}
492+
}()
493+
for i := 0; i < 2; i++ {
494+
select {
495+
case <-started:
496+
case <-time.After(time.Second):
497+
t.Fatalf("peer %d: handshake timeout", i)
498+
}
499+
}
500+
}
501+
372502
func newkey() *ecdsa.PrivateKey {
373503
key, err := crypto.GenerateKey()
374504
if err != nil {

0 commit comments

Comments
 (0)