Skip to content

Commit c155c8e

Browse files
authored
cmd/devp2p: faster crawling + less verbose dns updates (#26697)
This improves the speed of DHT crawling by using concurrent requests. It also removes logging of individual DNS updates.
1 parent ee530c0 commit c155c8e

File tree

5 files changed

+78
-36
lines changed

5 files changed

+78
-36
lines changed

cmd/devp2p/crawl.go

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package main
1818

1919
import (
20+
"sync"
21+
"sync/atomic"
2022
"time"
2123

2224
"github.com/ethereum/go-ethereum/log"
@@ -34,6 +36,7 @@ type crawler struct {
3436

3537
// settings
3638
revalidateInterval time.Duration
39+
mu sync.RWMutex
3740
}
3841

3942
const (
@@ -67,43 +70,59 @@ func newCrawler(input nodeSet, disc resolver, iters ...enode.Iterator) *crawler
6770
return c
6871
}
6972

70-
func (c *crawler) run(timeout time.Duration) nodeSet {
73+
func (c *crawler) run(timeout time.Duration, nthreads int) nodeSet {
7174
var (
7275
timeoutTimer = time.NewTimer(timeout)
7376
timeoutCh <-chan time.Time
7477
statusTicker = time.NewTicker(time.Second * 8)
7578
doneCh = make(chan enode.Iterator, len(c.iters))
7679
liveIters = len(c.iters)
7780
)
81+
if nthreads < 1 {
82+
nthreads = 1
83+
}
7884
defer timeoutTimer.Stop()
7985
defer statusTicker.Stop()
8086
for _, it := range c.iters {
8187
go c.runIterator(doneCh, it)
8288
}
83-
8489
var (
85-
added int
86-
updated int
87-
skipped int
88-
recent int
89-
removed int
90+
added uint64
91+
updated uint64
92+
skipped uint64
93+
recent uint64
94+
removed uint64
95+
wg sync.WaitGroup
9096
)
97+
wg.Add(nthreads)
98+
for i := 0; i < nthreads; i++ {
99+
go func() {
100+
defer wg.Done()
101+
for {
102+
select {
103+
case n := <-c.ch:
104+
switch c.updateNode(n) {
105+
case nodeSkipIncompat:
106+
atomic.AddUint64(&skipped, 1)
107+
case nodeSkipRecent:
108+
atomic.AddUint64(&recent, 1)
109+
case nodeRemoved:
110+
atomic.AddUint64(&removed, 1)
111+
case nodeAdded:
112+
atomic.AddUint64(&added, 1)
113+
default:
114+
atomic.AddUint64(&updated, 1)
115+
}
116+
case <-c.closed:
117+
return
118+
}
119+
}
120+
}()
121+
}
122+
91123
loop:
92124
for {
93125
select {
94-
case n := <-c.ch:
95-
switch c.updateNode(n) {
96-
case nodeSkipIncompat:
97-
skipped++
98-
case nodeSkipRecent:
99-
recent++
100-
case nodeRemoved:
101-
removed++
102-
case nodeAdded:
103-
added++
104-
default:
105-
updated++
106-
}
107126
case it := <-doneCh:
108127
if it == c.inputIter {
109128
// Enable timeout when we're done revalidating the input nodes.
@@ -119,8 +138,11 @@ loop:
119138
break loop
120139
case <-statusTicker.C:
121140
log.Info("Crawling in progress",
122-
"added", added, "updated", updated, "removed", removed,
123-
"ignored(recent)", recent, "ignored(incompatible)", skipped)
141+
"added", atomic.LoadUint64(&added),
142+
"updated", atomic.LoadUint64(&updated),
143+
"removed", atomic.LoadUint64(&removed),
144+
"ignored(recent)", atomic.LoadUint64(&removed),
145+
"ignored(incompatible)", atomic.LoadUint64(&skipped))
124146
}
125147
}
126148

@@ -131,6 +153,7 @@ loop:
131153
for ; liveIters > 0; liveIters-- {
132154
<-doneCh
133155
}
156+
wg.Wait()
134157
return c.output
135158
}
136159

@@ -148,18 +171,19 @@ func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) {
148171
// updateNode updates the info about the given node, and returns a status
149172
// about what changed
150173
func (c *crawler) updateNode(n *enode.Node) int {
174+
c.mu.RLock()
151175
node, ok := c.output[n.ID()]
176+
c.mu.RUnlock()
152177

153178
// Skip validation of recently-seen nodes.
154179
if ok && time.Since(node.LastCheck) < c.revalidateInterval {
155180
return nodeSkipRecent
156181
}
157182

158183
// Request the node record.
159-
nn, err := c.disc.RequestENR(n)
160-
node.LastCheck = truncNow()
161184
status := nodeUpdated
162-
if err != nil {
185+
node.LastCheck = truncNow()
186+
if nn, err := c.disc.RequestENR(n); err != nil {
163187
if node.Score == 0 {
164188
// Node doesn't implement EIP-868.
165189
log.Debug("Skipping node", "id", n.ID())
@@ -176,8 +200,9 @@ func (c *crawler) updateNode(n *enode.Node) int {
176200
}
177201
node.LastResponse = node.LastCheck
178202
}
179-
180203
// Store/update node in output set.
204+
c.mu.Lock()
205+
defer c.mu.Unlock()
181206
if node.Score <= 0 {
182207
log.Debug("Removing node", "id", n.ID())
183208
delete(c.output, n.ID())

cmd/devp2p/discv4cmd.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ var (
7878
Name: "crawl",
7979
Usage: "Updates a nodes.json file with random nodes found in the DHT",
8080
Action: discv4Crawl,
81-
Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag}),
81+
Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag}),
8282
}
8383
discv4TestCommand = &cli.Command{
8484
Name: "test",
@@ -120,6 +120,11 @@ var (
120120
Usage: "Time limit for the crawl.",
121121
Value: 30 * time.Minute,
122122
}
123+
crawlParallelismFlag = &cli.IntFlag{
124+
Name: "parallel",
125+
Usage: "How many parallel discoveries to attempt.",
126+
Value: 16,
127+
}
123128
remoteEnodeFlag = &cli.StringFlag{
124129
Name: "remote",
125130
Usage: "Enode of the remote node under test",
@@ -195,7 +200,7 @@ func discv4ResolveJSON(ctx *cli.Context) error {
195200
defer disc.Close()
196201
c := newCrawler(inputSet, disc, enode.IterNodes(nodeargs))
197202
c.revalidateInterval = 0
198-
output := c.run(0)
203+
output := c.run(0, 1)
199204
writeNodesJSON(nodesFile, output)
200205
return nil
201206
}
@@ -214,7 +219,7 @@ func discv4Crawl(ctx *cli.Context) error {
214219
defer disc.Close()
215220
c := newCrawler(inputSet, disc, disc.RandomNodes())
216221
c.revalidateInterval = 10 * time.Minute
217-
output := c.run(ctx.Duration(crawlTimeoutFlag.Name))
222+
output := c.run(ctx.Duration(crawlTimeoutFlag.Name), ctx.Int(crawlParallelismFlag.Name))
218223
writeNodesJSON(nodesFile, output)
219224
return nil
220225
}

cmd/devp2p/discv5cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func discv5Crawl(ctx *cli.Context) error {
110110
defer disc.Close()
111111
c := newCrawler(inputSet, disc, disc.RandomNodes())
112112
c.revalidateInterval = 10 * time.Minute
113-
output := c.run(ctx.Duration(crawlTimeoutFlag.Name))
113+
output := c.run(ctx.Duration(crawlTimeoutFlag.Name), ctx.Int(crawlParallelismFlag.Name))
114114
writeNodesJSON(nodesFile, output)
115115
return nil
116116
}

cmd/devp2p/dns_cloudflare.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,16 @@ func (c *cloudflareClient) uploadRecords(name string, records map[string]string)
126126
}
127127

128128
// Iterate over the new records and inject anything missing.
129+
log.Info("Updating DNS entries")
130+
created := 0
131+
updated := 0
132+
skipped := 0
129133
for path, val := range records {
130134
old, exists := existing[path]
131135
if !exists {
132136
// Entry is unknown, push a new one to Cloudflare.
133-
log.Info(fmt.Sprintf("Creating %s = %q", path, val))
137+
log.Debug(fmt.Sprintf("Creating %s = %q", path, val))
138+
created++
134139
ttl := rootTTL
135140
if path != name {
136141
ttl = treeNodeTTLCloudflare // Max TTL permitted by Cloudflare
@@ -139,27 +144,33 @@ func (c *cloudflareClient) uploadRecords(name string, records map[string]string)
139144
_, err = c.CreateDNSRecord(context.Background(), c.zoneID, record)
140145
} else if old.Content != val {
141146
// Entry already exists, only change its content.
142-
log.Info(fmt.Sprintf("Updating %s from %q to %q", path, old.Content, val))
147+
log.Debug(fmt.Sprintf("Updating %s from %q to %q", path, old.Content, val))
148+
updated++
143149
old.Content = val
144150
err = c.UpdateDNSRecord(context.Background(), c.zoneID, old.ID, old)
145151
} else {
152+
skipped++
146153
log.Debug(fmt.Sprintf("Skipping %s = %q", path, val))
147154
}
148155
if err != nil {
149156
return fmt.Errorf("failed to publish %s: %v", path, err)
150157
}
151158
}
152-
159+
log.Info("Updated DNS entries", "new", created, "updated", updated, "untouched", skipped)
153160
// Iterate over the old records and delete anything stale.
161+
deleted := 0
162+
log.Info("Deleting stale DNS entries")
154163
for path, entry := range existing {
155164
if _, ok := records[path]; ok {
156165
continue
157166
}
158167
// Stale entry, nuke it.
159-
log.Info(fmt.Sprintf("Deleting %s = %q", path, entry.Content))
168+
log.Debug(fmt.Sprintf("Deleting %s = %q", path, entry.Content))
169+
deleted++
160170
if err := c.DeleteDNSRecord(context.Background(), c.zoneID, entry.ID); err != nil {
161171
return fmt.Errorf("failed to delete %s: %v", path, err)
162172
}
163173
}
174+
log.Info("Deleted stale DNS entries", "count", deleted)
164175
return nil
165176
}

cmd/devp2p/dns_route53.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,9 @@ func (c *route53Client) collectRecords(name string) (map[string]recordSet, error
329329
var req route53.ListResourceRecordSetsInput
330330
req.HostedZoneId = &c.zoneID
331331
existing := make(map[string]recordSet)
332+
log.Info("Loading existing TXT records", "name", name, "zone", c.zoneID)
332333
for page := 0; ; page++ {
333-
log.Info("Loading existing TXT records", "name", name, "zone", c.zoneID, "page", page)
334+
log.Debug("Loading existing TXT records", "name", name, "zone", c.zoneID, "page", page)
334335
resp, err := c.api.ListResourceRecordSets(context.TODO(), &req)
335336
if err != nil {
336337
return existing, err
@@ -360,7 +361,7 @@ func (c *route53Client) collectRecords(name string) (map[string]recordSet, error
360361
req.StartRecordName = resp.NextRecordName
361362
req.StartRecordType = resp.NextRecordType
362363
}
363-
364+
log.Info("Loaded existing TXT records", "name", name, "zone", c.zoneID, "records", len(existing))
364365
return existing, nil
365366
}
366367

0 commit comments

Comments
 (0)