Skip to content

Commit e33e576

Browse files
committed
p2p/discv5: fixed bootnode connect issues
1 parent a0c6649 commit e33e576

File tree

5 files changed

+110
-69
lines changed

5 files changed

+110
-69
lines changed

les/handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ func (pm *ProtocolManager) findServers() {
240240
if pm.p2pServer == nil || pm.topicDisc == nil {
241241
return
242242
}
243+
glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic))
243244
enodes := make(chan string, 100)
244245
stop := make(chan struct{})
245246
go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes)
@@ -280,9 +281,9 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) {
280281
} else {
281282
if pm.topicDisc != nil {
282283
go func() {
283-
glog.V(logger.Debug).Infoln("Starting topic register")
284+
glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic))
284285
pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync)
285-
glog.V(logger.Debug).Infoln("Stopped topic register")
286+
glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic))
286287
}()
287288
}
288289
go func() {

p2p/discv5/net.go

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ var (
4141
)
4242

4343
const (
44-
autoRefreshInterval = 1 * time.Hour
45-
seedCount = 30
46-
seedMaxAge = 5 * 24 * time.Hour
44+
autoRefreshInterval = 1 * time.Hour
45+
bucketRefreshInterval = 1 * time.Minute
46+
seedCount = 30
47+
seedMaxAge = 5 * 24 * time.Hour
4748
)
4849

4950
const testTopic = "foo"
@@ -82,7 +83,6 @@ type Network struct {
8283
tableOpResp chan struct{}
8384
topicRegisterReq chan topicRegisterReq
8485
topicSearchReq chan topicSearchReq
85-
bucketFillChn chan chan struct{}
8686

8787
// State of the main loop.
8888
tab *Table
@@ -169,7 +169,6 @@ func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, d
169169
queryReq: make(chan *findnodeQuery),
170170
topicRegisterReq: make(chan topicRegisterReq),
171171
topicSearchReq: make(chan topicSearchReq),
172-
bucketFillChn: make(chan chan struct{}, 1),
173172
nodes: make(map[NodeID]*Node),
174173
}
175174
go net.loop()
@@ -353,8 +352,9 @@ func (net *Network) reqTableOp(f func()) (called bool) {
353352

354353
func (net *Network) loop() {
355354
var (
356-
refreshTimer = time.NewTicker(autoRefreshInterval)
357-
refreshDone chan struct{} // closed when the 'refresh' lookup has ended
355+
refreshTimer = time.NewTicker(autoRefreshInterval)
356+
bucketRefreshTimer = time.NewTimer(bucketRefreshInterval)
357+
refreshDone chan struct{} // closed when the 'refresh' lookup has ended
358358
)
359359

360360
// Tracking the next ticket to register.
@@ -389,6 +389,7 @@ func (net *Network) loop() {
389389
topicRegisterLookupDone chan []*Node
390390
topicRegisterLookupTick = time.NewTimer(0)
391391
topicSearchLookupTarget lookupInfo
392+
searchReqWhenRefreshDone []topicSearchReq
392393
)
393394
topicSearchLookupDone := make(chan []*Node, 1)
394395
<-topicRegisterLookupTick.C
@@ -406,6 +407,7 @@ loop:
406407

407408
// Ingress packet handling.
408409
case pkt := <-net.read:
410+
//fmt.Println("read", pkt.ev)
409411
debugLog("<-net.read")
410412
n := net.internNode(&pkt)
411413
prestate := n.state
@@ -503,14 +505,18 @@ loop:
503505
net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
504506

505507
case req := <-net.topicSearchReq:
506-
debugLog("<-net.topicSearchReq")
507-
if req.found == nil {
508-
net.ticketStore.removeSearchTopic(req.topic)
509-
continue
510-
}
511-
net.ticketStore.addSearchTopic(req.topic, req.found)
512-
if (topicSearchLookupTarget.target == common.Hash{}) {
513-
topicSearchLookupDone <- nil
508+
if refreshDone == nil {
509+
debugLog("<-net.topicSearchReq")
510+
if req.found == nil {
511+
net.ticketStore.removeSearchTopic(req.topic)
512+
continue
513+
}
514+
net.ticketStore.addSearchTopic(req.topic, req.found)
515+
if (topicSearchLookupTarget.target == common.Hash{}) {
516+
topicSearchLookupDone <- nil
517+
}
518+
} else {
519+
searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
514520
}
515521

516522
case nodes := <-topicSearchLookupDone:
@@ -519,7 +525,14 @@ loop:
519525
net.ping(n, n.addr())
520526
return n.pingEcho
521527
}, func(n *Node, topic Topic) []byte {
522-
return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
528+
if n.state == known {
529+
return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
530+
} else {
531+
if n.state == unknown {
532+
net.ping(n, n.addr())
533+
}
534+
return nil
535+
}
523536
})
524537
topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
525538
target := topicSearchLookupTarget.target
@@ -564,9 +577,12 @@ loop:
564577
refreshDone = make(chan struct{})
565578
net.refresh(refreshDone)
566579
}
567-
case doneChn := <-net.bucketFillChn:
568-
debugLog("bucketFill")
569-
net.bucketFill(doneChn)
580+
case <-bucketRefreshTimer.C:
581+
target := net.tab.chooseBucketRefreshTarget()
582+
go func() {
583+
net.lookup(target, false)
584+
bucketRefreshTimer.Reset(bucketRefreshInterval)
585+
}()
570586
case newNursery := <-net.refreshReq:
571587
debugLog("<-net.refreshReq")
572588
if newNursery != nil {
@@ -580,6 +596,13 @@ loop:
580596
case <-refreshDone:
581597
debugLog("<-net.refreshDone")
582598
refreshDone = nil
599+
list := searchReqWhenRefreshDone
600+
searchReqWhenRefreshDone = nil
601+
go func() {
602+
for _, req := range list {
603+
net.topicSearchReq <- req
604+
}
605+
}()
583606
}
584607
}
585608
debugLog("loop stopped")
@@ -643,28 +666,13 @@ func (net *Network) refresh(done chan<- struct{}) {
643666
}()
644667
}
645668

646-
func (net *Network) bucketFill(done chan<- struct{}) {
647-
target := net.tab.chooseBucketFillTarget()
648-
go func() {
649-
net.lookup(target, false)
650-
close(done)
651-
}()
652-
}
653-
654-
func (net *Network) BucketFill() {
655-
done := make(chan struct{})
656-
select {
657-
case net.bucketFillChn <- done:
658-
<-done
659-
case <-net.closed:
660-
close(done)
661-
}
662-
}
663-
664669
// Node Interning.
665670

666671
func (net *Network) internNode(pkt *ingressPacket) *Node {
667672
if n := net.nodes[pkt.remoteID]; n != nil {
673+
n.IP = pkt.remoteAddr.IP
674+
n.UDP = uint16(pkt.remoteAddr.Port)
675+
n.TCP = uint16(pkt.remoteAddr.Port)
668676
return n
669677
}
670678
n := NewNode(pkt.remoteID, pkt.remoteAddr.IP, uint16(pkt.remoteAddr.Port), uint16(pkt.remoteAddr.Port))
@@ -967,8 +975,10 @@ func init() {
967975

968976
// handle processes packets sent by n and events related to n.
969977
func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
978+
//fmt.Println("handle", n.addr().String(), n.state, ev)
970979
if pkt != nil {
971980
if err := net.checkPacket(n, ev, pkt); err != nil {
981+
//fmt.Println("check err:", err)
972982
return err
973983
}
974984
// Start the background expiration goroutine after the first
@@ -985,6 +995,7 @@ func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
985995
}
986996
next, err := n.state.handle(net, n, ev, pkt)
987997
net.transition(n, next)
998+
//fmt.Println("new state:", n.state)
988999
return err
9891000
}
9901001

@@ -1040,6 +1051,11 @@ func (net *Network) abortTimedEvent(n *Node, ev nodeEvent) {
10401051
}
10411052

10421053
func (net *Network) ping(n *Node, addr *net.UDPAddr) {
1054+
//fmt.Println("ping", n.addr().String(), n.ID.String(), n.sha.Hex())
1055+
if n.pingEcho != nil || n.ID == net.tab.self.ID {
1056+
//fmt.Println(" not sent")
1057+
return
1058+
}
10431059
debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8]))
10441060
n.pingTopics = net.ticketStore.regTopicSet()
10451061
n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)

p2p/discv5/table.go

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package discv5
2525
import (
2626
"crypto/rand"
2727
"encoding/binary"
28+
"fmt"
2829
"net"
2930
"sort"
3031

@@ -64,42 +65,54 @@ func newTable(ourID NodeID, ourAddr *net.UDPAddr) *Table {
6465
return tab
6566
}
6667

67-
func (tab *Table) chooseBucketFillTarget() common.Hash {
68-
bucketCount := nBuckets
69-
for bucketCount > 0 && len(tab.buckets[nBuckets-bucketCount].entries) == 0 {
70-
bucketCount--
68+
const printTable = false
69+
70+
// chooseBucketRefreshTarget selects random refresh targets to keep all Kademlia
71+
// buckets filled with live connections and keep the network topology healthy.
72+
// This requires selecting addresses closer to our own with a higher probability
73+
// in order to refresh closer buckets too.
74+
//
75+
// This algorithm approximates the distance distribution of existing nodes in the
76+
// table by selecting a random node from the table and selecting a target address
77+
// with a distance less than twice of that of the selected node.
78+
// This algorithm will be improved later to specifically target the least recently
79+
// used buckets.
80+
func (tab *Table) chooseBucketRefreshTarget() common.Hash {
81+
entries := 0
82+
if printTable {
83+
fmt.Println()
7184
}
72-
var bucket int
73-
for {
74-
// select a target hash that could go into a certain randomly selected bucket
75-
// buckets are chosen with an even chance out of the existing ones that contain
76-
// less that bucketSize entries, plus a potential new one beyond these
77-
bucket = nBuckets - 1 - int(randUint(uint32(bucketCount+1)))
78-
if bucket == bucketCount || len(tab.buckets[bucket].entries) < bucketSize {
79-
break
85+
for i, b := range tab.buckets {
86+
entries += len(b.entries)
87+
if printTable {
88+
for _, e := range b.entries {
89+
fmt.Println(i, e.state, e.addr().String(), e.ID.String(), e.sha.Hex())
90+
}
8091
}
8192
}
8293

83-
// calculate target that has the desired log distance from our own address hash
84-
target := tab.self.sha.Bytes()
85-
prefix := binary.BigEndian.Uint64(target[0:8])
86-
shift := uint(nBuckets - 1 - bucket)
87-
if bucket != bucketCount {
88-
shift++
94+
prefix := binary.BigEndian.Uint64(tab.self.sha[0:8])
95+
dist := ^uint64(0)
96+
entry := int(randUint(uint32(entries + 1)))
97+
for _, b := range tab.buckets {
98+
if entry < len(b.entries) {
99+
n := b.entries[entry]
100+
dist = binary.BigEndian.Uint64(n.sha[0:8]) ^ prefix
101+
break
102+
}
103+
entry -= len(b.entries)
89104
}
90-
var b [8]byte
91-
rand.Read(b[:])
92-
rnd := binary.BigEndian.Uint64(b[:])
93-
rndMask := (^uint64(0)) >> shift
94-
addrMask := ^rndMask
95-
xorMask := uint64(0)
96-
if bucket != bucketCount {
97-
xorMask = rndMask + 1
105+
106+
ddist := ^uint64(0)
107+
if dist+dist > dist {
108+
ddist = dist
98109
}
99-
prefix = (prefix&addrMask ^ xorMask) | (rnd & rndMask)
100-
binary.BigEndian.PutUint64(target[0:8], prefix)
110+
targetPrefix := prefix ^ randUint64n(ddist)
111+
112+
var target common.Hash
113+
binary.BigEndian.PutUint64(target[0:8], targetPrefix)
101114
rand.Read(target[8:])
102-
return common.BytesToHash(target)
115+
return target
103116
}
104117

105118
// readRandomNodes fills the given slice with random nodes from the
@@ -175,6 +188,10 @@ func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
175188
// bucket has space available, adding the node succeeds immediately.
176189
// Otherwise, the node is added to the replacement cache for the bucket.
177190
func (tab *Table) add(n *Node) (contested *Node) {
191+
//fmt.Println("add", n.addr().String(), n.ID.String(), n.sha.Hex())
192+
if n.ID == tab.self.ID {
193+
return
194+
}
178195
b := tab.buckets[logdist(tab.self.sha, n.sha)]
179196
switch {
180197
case b.bump(n):
@@ -228,6 +245,7 @@ outer:
228245
// delete removes an entry from the node table (used to evacuate
229246
// failed/non-bonded discovery peers).
230247
func (tab *Table) delete(node *Node) {
248+
//fmt.Println("delete", node.addr().String(), node.ID.String(), node.sha.Hex())
231249
bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
232250
for i := range bucket.entries {
233251
if bucket.entries[i].ID == node.ID {

p2p/discv5/ticket.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,9 @@ func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, ping fu
525525
} // else {
526526
if s.canQueryTopic(n, lookup.topic) {
527527
hash := query(n, lookup.topic)
528-
s.addTopicQuery(common.BytesToHash(hash), n, lookup)
528+
if hash != nil {
529+
s.addTopicQuery(common.BytesToHash(hash), n, lookup)
530+
}
529531
}
530532
//}
531533
}

p2p/discv5/udp.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,14 +336,17 @@ func (t *udp) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node)
336336
}
337337

338338
func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req interface{}) (hash []byte, err error) {
339+
//fmt.Println("sendPacket", nodeEvent(ptype), toaddr.String(), toid.String())
339340
packet, hash, err := encodePacket(t.priv, ptype, req)
340341
if err != nil {
342+
//fmt.Println(err)
341343
return hash, err
342344
}
343345
glog.V(logger.Detail).Infof(">>> %v to %x@%v\n", nodeEvent(ptype), toid[:8], toaddr)
344346
if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
345347
glog.V(logger.Detail).Infoln("UDP send failed:", err)
346348
}
349+
//fmt.Println(err)
347350
return hash, err
348351
}
349352

@@ -406,6 +409,7 @@ func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
406409
pkt := ingressPacket{remoteAddr: from}
407410
if err := decodePacket(buf, &pkt); err != nil {
408411
glog.V(logger.Debug).Infof("Bad packet from %v: %v\n", from, err)
412+
//fmt.Println("bad packet", err)
409413
return err
410414
}
411415
t.net.reqReadPacket(pkt)

0 commit comments

Comments
 (0)