Skip to content

Commit 258cc73

Browse files
committed
Merge pull request #2510 from fjl/p2p-fixups
p2p/discover: prevent bonding self
2 parents 79b7b5e + 8110671 commit 258cc73

File tree

3 files changed

+82
-29
lines changed

3 files changed

+82
-29
lines changed

p2p/discover/table.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package discover
2525
import (
2626
"crypto/rand"
2727
"encoding/binary"
28+
"errors"
2829
"fmt"
2930
"net"
3031
"sort"
@@ -457,6 +458,9 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) {
457458
// If pinged is true, the remote node has just pinged us and one half
458459
// of the process can be skipped.
459460
func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) {
461+
if id == tab.self.ID {
462+
return nil, errors.New("is self")
463+
}
460464
// Retrieve a previously known node and any recent findnode failures
461465
node, fails := tab.db.node(id), 0
462466
if node != nil {

p2p/server.go

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -398,12 +398,11 @@ type dialer interface {
398398
func (srv *Server) run(dialstate dialer) {
399399
defer srv.loopWG.Done()
400400
var (
401-
peers = make(map[discover.NodeID]*Peer)
402-
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
403-
404-
tasks []task
405-
pendingTasks []task
401+
peers = make(map[discover.NodeID]*Peer)
402+
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
406403
taskdone = make(chan task, maxActiveDialTasks)
404+
runningTasks []task
405+
queuedTasks []task // tasks that can't run yet
407406
)
408407
// Put trusted nodes into a map to speed up checks.
409408
// Trusted peers are loaded on startup and cannot be
@@ -412,39 +411,39 @@ func (srv *Server) run(dialstate dialer) {
412411
trusted[n.ID] = true
413412
}
414413

415-
// Some task list helpers.
414+
// removes t from runningTasks
416415
delTask := func(t task) {
417-
for i := range tasks {
418-
if tasks[i] == t {
419-
tasks = append(tasks[:i], tasks[i+1:]...)
416+
for i := range runningTasks {
417+
if runningTasks[i] == t {
418+
runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
420419
break
421420
}
422421
}
423422
}
424-
scheduleTasks := func(new []task) {
425-
pt := append(pendingTasks, new...)
426-
start := maxActiveDialTasks - len(tasks)
427-
if len(pt) < start {
428-
start = len(pt)
423+
// starts until max number of active tasks is satisfied
424+
startTasks := func(ts []task) (rest []task) {
425+
i := 0
426+
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
427+
t := ts[i]
428+
glog.V(logger.Detail).Infoln("new task:", t)
429+
go func() { t.Do(srv); taskdone <- t }()
430+
runningTasks = append(runningTasks, t)
429431
}
430-
if start > 0 {
431-
tasks = append(tasks, pt[:start]...)
432-
for _, t := range pt[:start] {
433-
t := t
434-
glog.V(logger.Detail).Infoln("new task:", t)
435-
go func() { t.Do(srv); taskdone <- t }()
436-
}
437-
copy(pt, pt[start:])
438-
pendingTasks = pt[:len(pt)-start]
432+
return ts[i:]
433+
}
434+
scheduleTasks := func() {
435+
// Start from queue first.
436+
queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
437+
// Query dialer for new tasks and start as many as possible now.
438+
if len(runningTasks) < maxActiveDialTasks {
439+
nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
440+
queuedTasks = append(queuedTasks, startTasks(nt)...)
439441
}
440442
}
441443

442444
running:
443445
for {
444-
// Query the dialer for new tasks and launch them.
445-
now := time.Now()
446-
nt := dialstate.newTasks(len(pendingTasks)+len(tasks), peers, now)
447-
scheduleTasks(nt)
446+
scheduleTasks()
448447

449448
select {
450449
case <-srv.quit:
@@ -466,7 +465,7 @@ running:
466465
// can update its state and remove it from the active
467466
// tasks list.
468467
glog.V(logger.Detail).Infoln("<-taskdone:", t)
469-
dialstate.taskDone(t, now)
468+
dialstate.taskDone(t, time.Now())
470469
delTask(t)
471470
case c := <-srv.posthandshake:
472471
// A connection has passed the encryption handshake so
@@ -513,7 +512,7 @@ running:
513512
// Wait for peers to shut down. Pending connections and tasks are
514513
// not handled here and will terminate soon-ish because srv.quit
515514
// is closed.
516-
glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(tasks))
515+
glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(runningTasks))
517516
for len(peers) > 0 {
518517
p := <-srv.delpeer
519518
glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p)

p2p/server_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,56 @@ func TestServerTaskScheduling(t *testing.T) {
235235
}
236236
}
237237

238+
// This test checks that Server doesn't drop tasks,
239+
// even if newTasks returns more than the maximum number of tasks.
240+
func TestServerManyTasks(t *testing.T) {
241+
alltasks := make([]task, 300)
242+
for i := range alltasks {
243+
alltasks[i] = &testTask{index: i}
244+
}
245+
246+
var (
247+
srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true}
248+
done = make(chan *testTask)
249+
start, end = 0, 0
250+
)
251+
defer srv.Stop()
252+
srv.loopWG.Add(1)
253+
go srv.run(taskgen{
254+
newFunc: func(running int, peers map[discover.NodeID]*Peer) []task {
255+
start, end = end, end+maxActiveDialTasks+10
256+
if end > len(alltasks) {
257+
end = len(alltasks)
258+
}
259+
return alltasks[start:end]
260+
},
261+
doneFunc: func(tt task) {
262+
done <- tt.(*testTask)
263+
},
264+
})
265+
266+
doneset := make(map[int]bool)
267+
timeout := time.After(2 * time.Second)
268+
for len(doneset) < len(alltasks) {
269+
select {
270+
case tt := <-done:
271+
if doneset[tt.index] {
272+
t.Errorf("task %d got done more than once", tt.index)
273+
} else {
274+
doneset[tt.index] = true
275+
}
276+
case <-timeout:
277+
t.Errorf("%d of %d tasks got done within 2s", len(doneset), len(alltasks))
278+
for i := 0; i < len(alltasks); i++ {
279+
if !doneset[i] {
280+
t.Logf("task %d not done", i)
281+
}
282+
}
283+
return
284+
}
285+
}
286+
}
287+
238288
type taskgen struct {
239289
newFunc func(running int, peers map[discover.NodeID]*Peer) []task
240290
doneFunc func(task)

0 commit comments

Comments
 (0)