Skip to content

Commit 967d8de

Browse files
holimanfjl
andauthored
eth/downloader: fix peer idleness tracking when restarting state sync (#21260)
This fixes two issues with state sync restarts: When sync restarts with a new root, some peers can have in-flight requests. Since all peers with active requests were marked idle when exiting sync, the new sync would schedule more requests for those peers. When the response for the earlier request arrived, the new sync would reject it and mark the peer idle again, rendering the peer useless until it disconnected. The other issue was that peers would not be marked idle when they had delivered a response, but the response hadn't been processed before restarting the state sync. This also made the peer useless because it would be permanently marked busy. Co-authored-by: Felix Lange <[email protected]>
1 parent 7a556ab commit 967d8de

File tree

2 files changed

+67
-35
lines changed

2 files changed

+67
-35
lines changed

eth/downloader/downloader.go

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,9 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
328328
return err
329329
}
330330

331-
if errors.Is(err, errInvalidChain) {
331+
if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
332+
errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
333+
errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
332334
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
333335
if d.dropPeer == nil {
334336
// The dropPeer method is nil when `--copydb` is used for a local copy.
@@ -339,22 +341,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
339341
}
340342
return err
341343
}
342-
343-
switch err {
344-
case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer,
345-
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
346-
errInvalidAncestor:
347-
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
348-
if d.dropPeer == nil {
349-
// The dropPeer method is nil when `--copydb` is used for a local copy.
350-
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
351-
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
352-
} else {
353-
d.dropPeer(id)
354-
}
355-
default:
356-
log.Warn("Synchronisation failed, retrying", "err", err)
357-
}
344+
log.Warn("Synchronisation failed, retrying", "err", err)
358345
return err
359346
}
360347

@@ -643,7 +630,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
643630
headers := packet.(*headerPack).headers
644631
if len(headers) != 1 {
645632
p.log.Debug("Multiple headers for single request", "headers", len(headers))
646-
return nil, errBadPeer
633+
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
647634
}
648635
head := headers[0]
649636
if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
@@ -876,7 +863,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
876863
headers := packer.(*headerPack).headers
877864
if len(headers) != 1 {
878865
p.log.Debug("Multiple headers for single request", "headers", len(headers))
879-
return 0, errBadPeer
866+
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
880867
}
881868
arrived = true
882869

@@ -900,7 +887,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
900887
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
901888
if header.Number.Uint64() != check {
902889
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
903-
return 0, errBadPeer
890+
return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
904891
}
905892
start = check
906893
hash = h
@@ -1092,7 +1079,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
10921079
case d.headerProcCh <- nil:
10931080
case <-d.cancelCh:
10941081
}
1095-
return errBadPeer
1082+
return fmt.Errorf("%w: header request timed out", errBadPeer)
10961083
}
10971084
}
10981085
}
@@ -1520,7 +1507,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
15201507
inserts := d.queue.Schedule(chunk, origin)
15211508
if len(inserts) != len(chunk) {
15221509
log.Debug("Stale headers")
1523-
return errBadPeer
1510+
return fmt.Errorf("%w: stale headers", errBadPeer)
15241511
}
15251512
}
15261513
headers = headers[limit:]

eth/downloader/statesync.go

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func (d *Downloader) syncState(root common.Hash) *stateSync {
6363
s := newStateSync(d, root)
6464
select {
6565
case d.stateSyncStart <- s:
66+
// If we tell the statesync to restart with a new root, we also need
67+
// to wait for it to actually also start -- when old requests have timed
68+
// out or been delivered
69+
<-s.started
6670
case <-d.quitCh:
6771
s.err = errCancelStateFetch
6872
close(s.done)
@@ -95,15 +99,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
9599
finished []*stateReq // Completed or failed requests
96100
timeout = make(chan *stateReq) // Timed out active requests
97101
)
98-
defer func() {
99-
// Cancel active request timers on exit. Also set peers to idle so they're
100-
// available for the next sync.
101-
for _, req := range active {
102-
req.timer.Stop()
103-
req.peer.SetNodeDataIdle(len(req.items))
104-
}
105-
}()
102+
106103
// Run the state sync.
104+
log.Trace("State sync starting", "root", s.root)
107105
go s.run()
108106
defer s.Cancel()
109107

@@ -126,9 +124,11 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
126124
select {
127125
// The stateSync lifecycle:
128126
case next := <-d.stateSyncStart:
127+
d.spindownStateSync(active, finished, timeout, peerDrop)
129128
return next
130129

131130
case <-s.done:
131+
d.spindownStateSync(active, finished, timeout, peerDrop)
132132
return nil
133133

134134
// Send the next finished request to the current sync:
@@ -189,11 +189,9 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
189189
// causes valid requests to go missing and sync to get stuck.
190190
if old := active[req.peer.id]; old != nil {
191191
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
192-
193-
// Make sure the previous one doesn't get siletly lost
192+
// Move the previous request to the finished set
194193
old.timer.Stop()
195194
old.dropped = true
196-
197195
finished = append(finished, old)
198196
}
199197
// Start a timer to notify the sync loop if the peer stalled.
@@ -210,6 +208,46 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
210208
}
211209
}
212210

211+
// spindownStateSync 'drains' the outstanding requests; some will be delivered and other
212+
// will time out. This is to ensure that when the next stateSync starts working, all peers
213+
// are marked as idle and de facto _are_ idle.
214+
func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []*stateReq, timeout chan *stateReq, peerDrop chan *peerConnection) {
215+
log.Trace("State sync spinning down", "active", len(active), "finished", len(finished))
216+
217+
for len(active) > 0 {
218+
var (
219+
req *stateReq
220+
reason string
221+
)
222+
select {
223+
// Handle (drop) incoming state packs:
224+
case pack := <-d.stateCh:
225+
req = active[pack.PeerId()]
226+
reason = "delivered"
227+
// Handle dropped peer connections:
228+
case p := <-peerDrop:
229+
req = active[p.id]
230+
reason = "peerdrop"
231+
// Handle timed-out requests:
232+
case req = <-timeout:
233+
reason = "timeout"
234+
}
235+
if req == nil {
236+
continue
237+
}
238+
req.peer.log.Trace("State peer marked idle (spindown)", "req.items", len(req.items), "reason", reason)
239+
req.timer.Stop()
240+
delete(active, req.peer.id)
241+
req.peer.SetNodeDataIdle(len(req.items))
242+
}
243+
// The 'finished' set contains deliveries that we were going to pass to processing.
244+
// Those are now moot, but we still need to set those peers as idle, which would
245+
// otherwise have been done after processing
246+
for _, req := range finished {
247+
req.peer.SetNodeDataIdle(len(req.items))
248+
}
249+
}
250+
213251
// stateSync schedules requests for downloading a particular state trie defined
214252
// by a given state root.
215253
type stateSync struct {
@@ -222,11 +260,15 @@ type stateSync struct {
222260
numUncommitted int
223261
bytesUncommitted int
224262

263+
started chan struct{} // Started is signalled once the sync loop starts
264+
225265
deliver chan *stateReq // Delivery channel multiplexing peer responses
226266
cancel chan struct{} // Channel to signal a termination request
227267
cancelOnce sync.Once // Ensures cancel only ever gets called once
228268
done chan struct{} // Channel to signal termination completion
229269
err error // Any error hit during sync (set before completion)
270+
271+
root common.Hash
230272
}
231273

232274
// stateTask represents a single trie node download task, containing a set of
@@ -246,6 +288,8 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
246288
deliver: make(chan *stateReq),
247289
cancel: make(chan struct{}),
248290
done: make(chan struct{}),
291+
started: make(chan struct{}),
292+
root: root,
249293
}
250294
}
251295

@@ -276,6 +320,7 @@ func (s *stateSync) Cancel() error {
276320
// pushed here async. The reason is to decouple processing from data receipt
277321
// and timeouts.
278322
func (s *stateSync) loop() (err error) {
323+
close(s.started)
279324
// Listen for new peer events to assign tasks to them
280325
newPeer := make(chan *peerConnection, 1024)
281326
peerSub := s.d.peers.SubscribeNewPeers(newPeer)
@@ -331,11 +376,11 @@ func (s *stateSync) loop() (err error) {
331376
}
332377
// Process all the received blobs and check for stale delivery
333378
delivered, err := s.process(req)
379+
req.peer.SetNodeDataIdle(delivered)
334380
if err != nil {
335381
log.Warn("Node data write error", "err", err)
336382
return err
337383
}
338-
req.peer.SetNodeDataIdle(delivered)
339384
}
340385
}
341386
return nil
@@ -372,7 +417,7 @@ func (s *stateSync) assignTasks() {
372417

373418
// If the peer was assigned tasks to fetch, send the network request
374419
if len(req.items) > 0 {
375-
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
420+
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items), "root", s.root)
376421
select {
377422
case s.d.trackStateReq <- req:
378423
req.peer.FetchNodeData(req.items)

0 commit comments

Comments
 (0)