Skip to content

Commit 1440f9a

Browse files
committed
p2p: new dialer, peer management without locks
The most visible change is event-based dialing, which should be an improvement over the timer-based system that we have at the moment. The dialer gets a chance to compute new tasks whenever peers change or dials complete. This is better than checking peers on a timer because dials happen faster. The dialer can now make more precise decisions about whom to dial based on the peer set and we can test those decisions without actually opening any sockets. Peer management is easier to test because the tests can inject connections at checkpoints (after enc handshake, after protocol handshake). Most of the handshake stuff is now part of the RLPx code. It could be exported or move to its own package because it is no longer entangled with Server logic.
1 parent 9f38ef5 commit 1440f9a

File tree

11 files changed

+2118
-1329
lines changed

11 files changed

+2118
-1329
lines changed

p2p/dial.go

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
package p2p
2+
3+
import (
4+
"container/heap"
5+
"crypto/rand"
6+
"fmt"
7+
"net"
8+
"time"
9+
10+
"github.com/ethereum/go-ethereum/logger"
11+
"github.com/ethereum/go-ethereum/logger/glog"
12+
"github.com/ethereum/go-ethereum/p2p/discover"
13+
)
14+
15+
const (
16+
// This is the amount of time spent waiting in between
17+
// redialing a certain node.
18+
dialHistoryExpiration = 30 * time.Second
19+
20+
// Discovery lookup tasks will wait for this long when
21+
// no results are returned. This can happen if the table
22+
// becomes empty (i.e. not often).
23+
emptyLookupDelay = 10 * time.Second
24+
)
25+
26+
// dialstate schedules dials and discovery lookups.
27+
// it get's a chance to compute new tasks on every iteration
28+
// of the main loop in Server.run.
29+
type dialstate struct {
30+
maxDynDials int
31+
ntab discoverTable
32+
33+
lookupRunning bool
34+
bootstrapped bool
35+
36+
dialing map[discover.NodeID]connFlag
37+
lookupBuf []*discover.Node // current discovery lookup results
38+
randomNodes []*discover.Node // filled from Table
39+
static map[discover.NodeID]*discover.Node
40+
hist *dialHistory
41+
}
42+
43+
type discoverTable interface {
44+
Self() *discover.Node
45+
Close()
46+
Bootstrap([]*discover.Node)
47+
Lookup(target discover.NodeID) []*discover.Node
48+
ReadRandomNodes([]*discover.Node) int
49+
}
50+
51+
// the dial history remembers recent dials.
52+
type dialHistory []pastDial
53+
54+
// pastDial is an entry in the dial history.
55+
type pastDial struct {
56+
id discover.NodeID
57+
exp time.Time
58+
}
59+
60+
type task interface {
61+
Do(*Server)
62+
}
63+
64+
// A dialTask is generated for each node that is dialed.
65+
type dialTask struct {
66+
flags connFlag
67+
dest *discover.Node
68+
}
69+
70+
// discoverTask runs discovery table operations.
71+
// Only one discoverTask is active at any time.
72+
//
73+
// If bootstrap is true, the task runs Table.Bootstrap,
74+
// otherwise it performs a random lookup and leaves the
75+
// results in the task.
76+
type discoverTask struct {
77+
bootstrap bool
78+
results []*discover.Node
79+
}
80+
81+
// A waitExpireTask is generated if there are no other tasks
82+
// to keep the loop in Server.run ticking.
83+
type waitExpireTask struct {
84+
time.Duration
85+
}
86+
87+
func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dialstate {
88+
s := &dialstate{
89+
maxDynDials: maxdyn,
90+
ntab: ntab,
91+
static: make(map[discover.NodeID]*discover.Node),
92+
dialing: make(map[discover.NodeID]connFlag),
93+
randomNodes: make([]*discover.Node, maxdyn/2),
94+
hist: new(dialHistory),
95+
}
96+
for _, n := range static {
97+
s.static[n.ID] = n
98+
}
99+
return s
100+
}
101+
102+
func (s *dialstate) addStatic(n *discover.Node) {
103+
s.static[n.ID] = n
104+
}
105+
106+
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
107+
var newtasks []task
108+
addDial := func(flag connFlag, n *discover.Node) bool {
109+
_, dialing := s.dialing[n.ID]
110+
if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) {
111+
return false
112+
}
113+
s.dialing[n.ID] = flag
114+
newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
115+
return true
116+
}
117+
118+
// Compute number of dynamic dials necessary at this point.
119+
needDynDials := s.maxDynDials
120+
for _, p := range peers {
121+
if p.rw.is(dynDialedConn) {
122+
needDynDials--
123+
}
124+
}
125+
for _, flag := range s.dialing {
126+
if flag&dynDialedConn != 0 {
127+
needDynDials--
128+
}
129+
}
130+
131+
// Expire the dial history on every invocation.
132+
s.hist.expire(now)
133+
134+
// Create dials for static nodes if they are not connected.
135+
for _, n := range s.static {
136+
addDial(staticDialedConn, n)
137+
}
138+
139+
// Use random nodes from the table for half of the necessary
140+
// dynamic dials.
141+
randomCandidates := needDynDials / 2
142+
if randomCandidates > 0 && s.bootstrapped {
143+
n := s.ntab.ReadRandomNodes(s.randomNodes)
144+
for i := 0; i < randomCandidates && i < n; i++ {
145+
if addDial(dynDialedConn, s.randomNodes[i]) {
146+
needDynDials--
147+
}
148+
}
149+
}
150+
// Create dynamic dials from random lookup results, removing tried
151+
// items from the result buffer.
152+
i := 0
153+
for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
154+
if addDial(dynDialedConn, s.lookupBuf[i]) {
155+
needDynDials--
156+
}
157+
}
158+
s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
159+
// Launch a discovery lookup if more candidates are needed. The
160+
// first discoverTask bootstraps the table and won't return any
161+
// results.
162+
if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
163+
s.lookupRunning = true
164+
newtasks = append(newtasks, &discoverTask{bootstrap: !s.bootstrapped})
165+
}
166+
167+
// Launch a timer to wait for the next node to expire if all
168+
// candidates have been tried and no task is currently active.
169+
// This should prevent cases where the dialer logic is not ticked
170+
// because there are no pending events.
171+
if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
172+
t := &waitExpireTask{s.hist.min().exp.Sub(now)}
173+
newtasks = append(newtasks, t)
174+
}
175+
return newtasks
176+
}
177+
178+
func (s *dialstate) taskDone(t task, now time.Time) {
179+
switch t := t.(type) {
180+
case *dialTask:
181+
s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration))
182+
delete(s.dialing, t.dest.ID)
183+
case *discoverTask:
184+
if t.bootstrap {
185+
s.bootstrapped = true
186+
}
187+
s.lookupRunning = false
188+
s.lookupBuf = append(s.lookupBuf, t.results...)
189+
}
190+
}
191+
192+
func (t *dialTask) Do(srv *Server) {
193+
addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}
194+
glog.V(logger.Debug).Infof("dialing %v\n", t.dest)
195+
fd, err := srv.Dialer.Dial("tcp", addr.String())
196+
if err != nil {
197+
glog.V(logger.Detail).Infof("dial error: %v", err)
198+
return
199+
}
200+
srv.setupConn(fd, t.flags, t.dest)
201+
}
202+
func (t *dialTask) String() string {
203+
return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
204+
}
205+
206+
func (t *discoverTask) Do(srv *Server) {
207+
if t.bootstrap {
208+
srv.ntab.Bootstrap(srv.BootstrapNodes)
209+
} else {
210+
var target discover.NodeID
211+
rand.Read(target[:])
212+
t.results = srv.ntab.Lookup(target)
213+
// newTasks generates a lookup task whenever dynamic dials are
214+
// necessary. Lookups need to take some time, otherwise the
215+
// event loop spins too fast. An empty result can only be
216+
// returned if the table is empty.
217+
if len(t.results) == 0 {
218+
time.Sleep(emptyLookupDelay)
219+
}
220+
}
221+
}
222+
223+
func (t *discoverTask) String() (s string) {
224+
if t.bootstrap {
225+
s = "discovery bootstrap"
226+
} else {
227+
s = "discovery lookup"
228+
}
229+
if len(t.results) > 0 {
230+
s += fmt.Sprintf(" (%d results)", len(t.results))
231+
}
232+
return s
233+
}
234+
235+
func (t waitExpireTask) Do(*Server) {
236+
time.Sleep(t.Duration)
237+
}
238+
func (t waitExpireTask) String() string {
239+
return fmt.Sprintf("wait for dial hist expire (%v)", t.Duration)
240+
}
241+
242+
// Use only these methods to access or modify dialHistory.
243+
func (h dialHistory) min() pastDial {
244+
return h[0]
245+
}
246+
func (h *dialHistory) add(id discover.NodeID, exp time.Time) {
247+
heap.Push(h, pastDial{id, exp})
248+
}
249+
func (h dialHistory) contains(id discover.NodeID) bool {
250+
for _, v := range h {
251+
if v.id == id {
252+
return true
253+
}
254+
}
255+
return false
256+
}
257+
func (h *dialHistory) expire(now time.Time) {
258+
for h.Len() > 0 && h.min().exp.Before(now) {
259+
heap.Pop(h)
260+
}
261+
}
262+
263+
// heap.Interface boilerplate
264+
func (h dialHistory) Len() int { return len(h) }
265+
func (h dialHistory) Less(i, j int) bool { return h[i].exp.Before(h[j].exp) }
266+
func (h dialHistory) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
267+
func (h *dialHistory) Push(x interface{}) {
268+
*h = append(*h, x.(pastDial))
269+
}
270+
func (h *dialHistory) Pop() interface{} {
271+
old := *h
272+
n := len(old)
273+
x := old[n-1]
274+
*h = old[0 : n-1]
275+
return x
276+
}

0 commit comments

Comments
 (0)