Skip to content

Commit 77304d8

Browse files
authored
fix(sensor): use iterator for DNS discovery (#738)
1 parent ad2c5eb commit 77304d8

File tree

1 file changed

+29
-19
lines changed

1 file changed

+29
-19
lines changed

cmd/p2p/sensor/sensor.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,11 @@ var SensorCmd = &cobra.Command{
242242
defer sub.Unsubscribe()
243243

244244
ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds.
245-
hourlyTicker := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour.
245+
ticker1h := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour.
246246
defer ticker.Stop()
247-
defer hourlyTicker.Stop()
247+
defer ticker1h.Stop()
248248

249+
dnsLock := make(chan struct{}, 1)
249250
signals := make(chan os.Signal, 1)
250251
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
251252

@@ -263,7 +264,7 @@ var SensorCmd = &cobra.Command{
263264
go handleRPC(conns, inputSensorParams.NetworkID)
264265

265266
// Run DNS discovery immediately at startup.
266-
go handleDNSDiscovery(&server)
267+
go handleDNSDiscovery(&server, dnsLock)
267268

268269
for {
269270
select {
@@ -283,8 +284,8 @@ var SensorCmd = &cobra.Command{
283284
if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil {
284285
log.Error().Err(err).Msg("Failed to write nodes to file")
285286
}
286-
case <-hourlyTicker.C:
287-
go handleDNSDiscovery(&server)
287+
case <-ticker1h.C:
288+
go handleDNSDiscovery(&server, dnsLock)
288289
case <-signals:
289290
// This gracefully stops the sensor so that the peers can be written to
290291
// the nodes file.
@@ -324,31 +325,37 @@ func handlePrometheus() {
324325
}
325326

326327
// handleDNSDiscovery performs DNS-based peer discovery and adds new peers to
327-
// the p2p server. It syncs the DNS discovery tree and adds any newly discovered
328-
// peers not already in the peers map.
329-
func handleDNSDiscovery(server *ethp2p.Server) {
328+
// the p2p server. It uses an iterator to discover peers incrementally rather
329+
// than loading all nodes at once. The lock channel prevents concurrent runs.
330+
func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) {
330331
if len(inputSensorParams.DiscoveryDNS) == 0 {
331332
return
332333
}
333334

335+
select {
336+
case lock <- struct{}{}:
337+
defer func() { <-lock }()
338+
default:
339+
log.Warn().Msg("DNS discovery already running, skipping")
340+
return
341+
}
342+
334343
log.Info().
335344
Str("discovery-dns", inputSensorParams.DiscoveryDNS).
336-
Msg("Starting DNS discovery sync")
345+
Msg("Starting DNS discovery")
337346

338347
client := dnsdisc.NewClient(dnsdisc.Config{})
339-
tree, err := client.SyncTree(inputSensorParams.DiscoveryDNS)
348+
iter, err := client.NewIterator(inputSensorParams.DiscoveryDNS)
340349
if err != nil {
341-
log.Error().Err(err).Msg("Failed to sync DNS discovery tree")
350+
log.Error().Err(err).Msg("Failed to create DNS discovery iterator")
342351
return
343352
}
353+
defer iter.Close()
344354

345-
// Log the number of nodes in the tree.
346-
log.Info().
347-
Int("unique_nodes", len(tree.Nodes())).
348-
Msg("Successfully synced DNS discovery tree")
349-
350-
// Add DNS-discovered peers.
351-
for _, node := range tree.Nodes() {
355+
// Add DNS-discovered peers using the iterator.
356+
count := 0
357+
for iter.Next() {
358+
node := iter.Node()
352359
log.Debug().
353360
Str("enode", node.URLv4()).
354361
Msg("Discovered peer through DNS")
@@ -357,9 +364,12 @@ func handleDNSDiscovery(server *ethp2p.Server) {
357364
// connect to the peer if it's already connected. If a node is part of the
358365
// static peer set, the server will handle reconnecting after disconnects.
359366
server.AddPeer(node)
367+
count++
360368
}
361369

362-
log.Info().Msg("Finished adding DNS discovery peers")
370+
log.Info().
371+
Int("discovered_peers", count).
372+
Msg("Finished DNS discovery")
363373
}
364374

365375
// getLatestBlock will get the latest block from an RPC provider.

0 commit comments

Comments
 (0)