Skip to content

Commit a6d3bf6

Browse files
committed
p2p/discv5: search and lookup improvement
1 parent 3e617f3 commit a6d3bf6

File tree

2 files changed

+99
-64
lines changed

2 files changed

+99
-64
lines changed

p2p/discv5/net.go

Lines changed: 85 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,15 @@ type topicRegisterReq struct {
126126
}
127127

128128
type topicSearchReq struct {
129-
topic Topic
130-
found chan<- string
129+
topic Topic
130+
found chan<- *Node
131+
lookup chan<- bool
132+
delay time.Duration
133+
}
134+
135+
type topicSearchResult struct {
136+
target lookupInfo
137+
nodes []*Node
131138
}
132139

133140
type timeoutEvent struct {
@@ -263,16 +270,23 @@ func (net *Network) lookup(target common.Hash, stopOnMatch bool) []*Node {
263270
break
264271
}
265272
// Wait for the next reply.
266-
for _, n := range <-reply {
267-
if n != nil && !seen[n.ID] {
268-
seen[n.ID] = true
269-
result.push(n, bucketSize)
270-
if stopOnMatch && n.sha == target {
271-
return result.entries
273+
select {
274+
case nodes := <-reply:
275+
for _, n := range nodes {
276+
if n != nil && !seen[n.ID] {
277+
seen[n.ID] = true
278+
result.push(n, bucketSize)
279+
if stopOnMatch && n.sha == target {
280+
return result.entries
281+
}
272282
}
273283
}
284+
pendingQueries--
285+
case <-time.After(respTimeout):
286+
// forget all pending requests, start new ones
287+
pendingQueries = 0
288+
reply = make(chan []*Node, alpha)
274289
}
275-
pendingQueries--
276290
}
277291
return result.entries
278292
}
@@ -293,18 +307,20 @@ func (net *Network) RegisterTopic(topic Topic, stop <-chan struct{}) {
293307
}
294308
}
295309

296-
func (net *Network) SearchTopic(topic Topic, stop <-chan struct{}, found chan<- string) {
297-
select {
298-
case net.topicSearchReq <- topicSearchReq{topic, found}:
299-
case <-net.closed:
300-
return
301-
}
302-
select {
303-
case <-net.closed:
304-
case <-stop:
310+
func (net *Network) SearchTopic(topic Topic, setPeriod <-chan time.Duration, found chan<- *Node, lookup chan<- bool) {
311+
for {
305312
select {
306-
case net.topicSearchReq <- topicSearchReq{topic, nil}:
307313
case <-net.closed:
314+
return
315+
case delay, ok := <-setPeriod:
316+
select {
317+
case net.topicSearchReq <- topicSearchReq{topic: topic, found: found, lookup: lookup, delay: delay}:
318+
case <-net.closed:
319+
return
320+
}
321+
if !ok {
322+
return
323+
}
308324
}
309325
}
310326
}
@@ -347,6 +363,13 @@ func (net *Network) reqTableOp(f func()) (called bool) {
347363

348364
// TODO: external address handling.
349365

366+
type topicSearchInfo struct {
367+
lookupChn chan<- bool
368+
period time.Duration
369+
}
370+
371+
const maxSearchCount = 5
372+
350373
func (net *Network) loop() {
351374
var (
352375
refreshTimer = time.NewTicker(autoRefreshInterval)
@@ -385,10 +408,12 @@ func (net *Network) loop() {
385408
topicRegisterLookupTarget lookupInfo
386409
topicRegisterLookupDone chan []*Node
387410
topicRegisterLookupTick = time.NewTimer(0)
388-
topicSearchLookupTarget lookupInfo
389411
searchReqWhenRefreshDone []topicSearchReq
412+
searchInfo = make(map[Topic]topicSearchInfo)
413+
activeSearchCount int
390414
)
391-
topicSearchLookupDone := make(chan []*Node, 1)
415+
topicSearchLookupDone := make(chan topicSearchResult, 100)
416+
topicSearch := make(chan Topic, 100)
392417
<-topicRegisterLookupTick.C
393418

394419
statsDump := time.NewTicker(10 * time.Second)
@@ -504,21 +529,52 @@ loop:
504529
case req := <-net.topicSearchReq:
505530
if refreshDone == nil {
506531
debugLog("<-net.topicSearchReq")
507-
if req.found == nil {
508-
net.ticketStore.removeSearchTopic(req.topic)
532+
info, ok := searchInfo[req.topic]
533+
if ok {
534+
if req.delay == time.Duration(0) {
535+
delete(searchInfo, req.topic)
536+
net.ticketStore.removeSearchTopic(req.topic)
537+
} else {
538+
info.period = req.delay
539+
searchInfo[req.topic] = info
540+
}
509541
continue
510542
}
511-
net.ticketStore.addSearchTopic(req.topic, req.found)
512-
if (topicSearchLookupTarget.target == common.Hash{}) {
513-
topicSearchLookupDone <- nil
543+
if req.delay != time.Duration(0) {
544+
var info topicSearchInfo
545+
info.period = req.delay
546+
info.lookupChn = req.lookup
547+
searchInfo[req.topic] = info
548+
net.ticketStore.addSearchTopic(req.topic, req.found)
549+
topicSearch <- req.topic
514550
}
515551
} else {
516552
searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
517553
}
518554

519-
case nodes := <-topicSearchLookupDone:
520-
debugLog("<-topicSearchLookupDone")
521-
net.ticketStore.searchLookupDone(topicSearchLookupTarget, nodes, func(n *Node) []byte {
555+
case topic := <-topicSearch:
556+
if activeSearchCount < maxSearchCount {
557+
activeSearchCount++
558+
target := net.ticketStore.nextSearchLookup(topic)
559+
go func() {
560+
nodes := net.lookup(target.target, false)
561+
topicSearchLookupDone <- topicSearchResult{target: target, nodes: nodes}
562+
}()
563+
}
564+
period := searchInfo[topic].period
565+
if period != time.Duration(0) {
566+
go func() {
567+
time.Sleep(period)
568+
topicSearch <- topic
569+
}()
570+
}
571+
572+
case res := <-topicSearchLookupDone:
573+
activeSearchCount--
574+
if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil {
575+
lookupChn <- net.ticketStore.radius[res.target.topic].converged
576+
}
577+
net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node) []byte {
522578
net.ping(n, n.addr())
523579
return n.pingEcho
524580
}, func(n *Node, topic Topic) []byte {
@@ -531,11 +587,6 @@ loop:
531587
return nil
532588
}
533589
})
534-
topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
535-
target := topicSearchLookupTarget.target
536-
if (target != common.Hash{}) {
537-
go func() { topicSearchLookupDone <- net.lookup(target, false) }()
538-
}
539590

540591
case <-statsDump.C:
541592
debugLog("<-statsDump.C")

p2p/discv5/ticket.go

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,12 @@ type ticketStore struct {
138138
nextTicketReg mclock.AbsTime
139139

140140
searchTopicMap map[Topic]searchTopic
141-
searchTopicList []Topic
142-
searchTopicPtr int
143141
nextTopicQueryCleanup mclock.AbsTime
144142
queriesSent map[*Node]map[common.Hash]sentQuery
145-
radiusLookupCnt int
146143
}
147144

148145
type searchTopic struct {
149-
foundChn chan<- string
150-
listIdx int
146+
foundChn chan<- *Node
151147
}
152148

153149
type sentQuery struct {
@@ -183,23 +179,15 @@ func (s *ticketStore) addTopic(t Topic, register bool) {
183179
}
184180
}
185181

186-
func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- string) {
182+
func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
187183
s.addTopic(t, false)
188184
if s.searchTopicMap[t].foundChn == nil {
189-
s.searchTopicList = append(s.searchTopicList, t)
190-
s.searchTopicMap[t] = searchTopic{foundChn: foundChn, listIdx: len(s.searchTopicList) - 1}
185+
s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
191186
}
192187
}
193188

194189
func (s *ticketStore) removeSearchTopic(t Topic) {
195190
if st := s.searchTopicMap[t]; st.foundChn != nil {
196-
lastIdx := len(s.searchTopicList) - 1
197-
lastTopic := s.searchTopicList[lastIdx]
198-
s.searchTopicList[st.listIdx] = lastTopic
199-
sl := s.searchTopicMap[lastTopic]
200-
sl.listIdx = st.listIdx
201-
s.searchTopicMap[lastTopic] = sl
202-
s.searchTopicList = s.searchTopicList[:lastIdx]
203191
delete(s.searchTopicMap, t)
204192
}
205193
}
@@ -247,20 +235,13 @@ func (s *ticketStore) nextRegisterLookup() (lookup lookupInfo, delay time.Durati
247235
return lookupInfo{}, 40 * time.Second
248236
}
249237

250-
func (s *ticketStore) nextSearchLookup() lookupInfo {
251-
if len(s.searchTopicList) == 0 {
252-
return lookupInfo{}
253-
}
254-
if s.searchTopicPtr >= len(s.searchTopicList) {
255-
s.searchTopicPtr = 0
256-
}
257-
topic := s.searchTopicList[s.searchTopicPtr]
258-
s.searchTopicPtr++
259-
target := s.radius[topic].nextTarget(s.radiusLookupCnt >= searchForceQuery)
238+
func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
239+
tr := s.radius[topic]
240+
target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
260241
if target.radiusLookup {
261-
s.radiusLookupCnt++
242+
tr.radiusLookupCnt++
262243
} else {
263-
s.radiusLookupCnt = 0
244+
tr.radiusLookupCnt = 0
264245
}
265246
return target
266247
}
@@ -662,9 +643,9 @@ func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNod
662643
if ip.IsUnspecified() || ip.IsLoopback() {
663644
ip = from.IP
664645
}
665-
enode := NewNode(node.ID, ip, node.UDP-1, node.TCP-1).String() // subtract one from port while discv5 is running in test mode on UDPport+1
646+
n := NewNode(node.ID, ip, node.UDP-1, node.TCP-1) // subtract one from port while discv5 is running in test mode on UDPport+1
666647
select {
667-
case chn <- enode:
648+
case chn <- n:
668649
default:
669650
return false
670651
}
@@ -677,6 +658,8 @@ type topicRadius struct {
677658
topicHashPrefix uint64
678659
radius, minRadius uint64
679660
buckets []topicRadiusBucket
661+
converged bool
662+
radiusLookupCnt int
680663
}
681664

682665
type topicRadiusEvent int
@@ -706,7 +689,7 @@ func (b *topicRadiusBucket) update(now mclock.AbsTime) {
706689
b.lastTime = now
707690

708691
for target, tm := range b.lookupSent {
709-
if now-tm > mclock.AbsTime(pingTimeout) {
692+
if now-tm > mclock.AbsTime(respTimeout) {
710693
b.weights[trNoAdjust] += 1
711694
delete(b.lookupSent, target)
712695
}
@@ -906,6 +889,7 @@ func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
906889

907890
if radiusLookup == -1 {
908891
// no more radius lookups needed at the moment, return a radius
892+
r.converged = true
909893
rad := maxBucket
910894
if minRadBucket < rad {
911895
rad = minRadBucket

0 commit comments

Comments
 (0)