Skip to content

Commit 76b4d03

Browse files
committed
eth/downloader: update with pruning cutoff
1 parent 1a369ec commit 76b4d03

File tree

2 files changed

+117
-31
lines changed

2 files changed

+117
-31
lines changed

eth/downloader/beaconsync.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,27 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
292292
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
293293
defer fsHeaderContCheckTimer.Stop()
294294

295+
// Verify the header at configured chain cutoff, ensuring it's matched with
296+
// the configured hash. Skip the check if the configured cutoff is even higher
297+
// than the sync target, which is definitely not a common case.
298+
if d.chainCutoffNumber != 0 && d.chainCutoffNumber >= from && d.chainCutoffNumber <= head.Number.Uint64() {
299+
h := d.skeleton.Header(d.chainCutoffNumber)
300+
if h == nil {
301+
if d.chainCutoffNumber < tail.Number.Uint64() {
302+
dist := tail.Number.Uint64() - d.chainCutoffNumber
303+
if len(localHeaders) >= int(dist) {
304+
h = localHeaders[dist-1]
305+
}
306+
}
307+
}
308+
if h == nil {
309+
return fmt.Errorf("header at chain cutoff is not available, cutoff: %d", d.chainCutoffNumber)
310+
}
311+
if h.Hash() != d.chainCutoffHash {
312+
return fmt.Errorf("header at chain cutoff mismatched, want: %v, got: %v", d.chainCutoffHash, h.Hash())
313+
}
314+
}
315+
295316
for {
296317
// Some beacon headers might have appeared since the last cycle, make
297318
// sure we're always syncing to all available ones

eth/downloader/downloader.go

Lines changed: 96 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"math/big"
24+
"sort"
2425
"sync"
2526
"sync/atomic"
2627
"time"
@@ -121,6 +122,12 @@ type Downloader struct {
121122
committed atomic.Bool
122123
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
123124

125+
// The cutoff block number and hash before which chain segments (bodies
126+
// and receipts) are skipped during synchronization. 0 means the entire
127+
// chain segment is aimed for synchronization.
128+
chainCutoffNumber uint64
129+
chainCutoffHash common.Hash
130+
124131
// Channels
125132
headerProcCh chan *headerTask // Channel to feed the header processor new tasks
126133

@@ -193,6 +200,10 @@ type BlockChain interface {
193200
// SnapSyncCommitHead directly commits the head block to a certain entity.
194201
SnapSyncCommitHead(common.Hash) error
195202

203+
// InsertHeadersBeforeCutoff inserts a batch of headers before the configured
204+
// chain cutoff into the ancient store.
205+
InsertHeadersBeforeCutoff([]*types.Header) (int, error)
206+
196207
// InsertChain inserts a batch of blocks into the local chain.
197208
InsertChain(types.Blocks) (int, error)
198209

@@ -205,22 +216,29 @@ type BlockChain interface {
205216
// TrieDB retrieves the low level trie database used for interacting
206217
// with trie nodes.
207218
TrieDB() *triedb.Database
219+
220+
// HistoryPruningCutoff returns the configured history pruning point.
221+
// Block bodies along with the receipts will be skipped for synchronization.
222+
HistoryPruningCutoff() (uint64, common.Hash)
208223
}
209224

210225
// New creates a new downloader to fetch hashes and blocks from remote peers.
211226
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
227+
cutoffNumber, cutoffHash := chain.HistoryPruningCutoff()
212228
dl := &Downloader{
213-
stateDB: stateDb,
214-
mux: mux,
215-
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
216-
peers: newPeerSet(),
217-
blockchain: chain,
218-
dropPeer: dropPeer,
219-
headerProcCh: make(chan *headerTask, 1),
220-
quitCh: make(chan struct{}),
221-
SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()),
222-
stateSyncStart: make(chan *stateSync),
223-
syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(),
229+
stateDB: stateDb,
230+
mux: mux,
231+
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
232+
peers: newPeerSet(),
233+
blockchain: chain,
234+
chainCutoffNumber: cutoffNumber,
235+
chainCutoffHash: cutoffHash,
236+
dropPeer: dropPeer,
237+
headerProcCh: make(chan *headerTask, 1),
238+
quitCh: make(chan struct{}),
239+
SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()),
240+
stateSyncStart: make(chan *stateSync),
241+
syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(),
224242
}
225243
// Create the post-merge skeleton syncer and start the process
226244
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success))
@@ -599,6 +617,12 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
599617
d.ancientLimit = 0
600618
}
601619
}
620+
// Extend the ancient chain segment range if the ancient limit is even
621+
// below the pre-configured chain cutoff.
622+
if d.chainCutoffNumber != 0 && d.chainCutoffNumber > d.ancientLimit {
623+
d.ancientLimit = d.chainCutoffNumber
624+
log.Info("Extend the ancient range with configured cutoff", "cutoff", d.chainCutoffNumber)
625+
}
602626
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
603627

604628
// If a part of blockchain data has already been written into active store,
@@ -617,8 +641,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
617641
log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin)
618642
}
619643
}
644+
// Skip ancient chain segments if Geth is running with a configured chain cutoff.
645+
// These segments are not guaranteed to be available in the network.
646+
chainOffset := origin + 1
647+
if mode == SnapSync && d.chainCutoffNumber != 0 {
648+
if chainOffset < d.chainCutoffNumber {
649+
chainOffset = d.chainCutoffNumber
650+
log.Info("Skip chain segment before cutoff", "origin", origin, "cutoff", d.chainCutoffNumber)
651+
}
652+
}
620653
// Initiate the sync using a concurrent header and content retrieval algorithm
621-
d.queue.Prepare(origin+1, mode)
654+
d.queue.Prepare(chainOffset, mode)
622655
if d.syncInitHook != nil {
623656
d.syncInitHook(origin, height)
624657
}
@@ -632,8 +665,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
632665
}
633666
fetchers := []func() error{
634667
headerFetcher, // Headers are always retrieved
635-
func() error { return d.fetchBodies(origin+1, beaconMode) }, // Bodies are retrieved during normal and snap sync
636-
func() error { return d.fetchReceipts(origin+1, beaconMode) }, // Receipts are retrieved during snap sync
668+
func() error { return d.fetchBodies(chainOffset, beaconMode) }, // Bodies are retrieved during normal and snap sync
669+
func() error { return d.fetchReceipts(chainOffset, beaconMode) }, // Receipts are retrieved during snap sync
637670
func() error { return d.processHeaders(origin+1, td, ttd, beaconMode) },
638671
}
639672
if mode == SnapSync {
@@ -1307,7 +1340,7 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
13071340
return nil
13081341
}
13091342
// Otherwise split the chunk of headers into batches and process them
1310-
headers, hashes := task.headers, task.hashes
1343+
headers, hashes, scheduled := task.headers, task.hashes, false
13111344

13121345
gotHeaders = true
13131346
for len(headers) > 0 {
@@ -1325,10 +1358,25 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
13251358
chunkHeaders := headers[:limit]
13261359
chunkHashes := hashes[:limit]
13271360

1328-
// In case of header only syncing, validate the chunk immediately
1361+
// Split the headers around the chain cutoff
1362+
var cutoff int
1363+
if mode == SnapSync && d.chainCutoffNumber != 0 {
1364+
cutoff = sort.Search(len(chunkHeaders), func(i int) bool {
1365+
return chunkHeaders[i].Number.Uint64() >= d.chainCutoffNumber
1366+
})
1367+
}
1368+
// Insert the header chain into the ancient store (with block bodies and
1369+
// receipts set to nil) if they fall before the cutoff.
13291370
if mode == SnapSync {
1330-
if len(chunkHeaders) > 0 {
1331-
if n, err := d.blockchain.InsertHeaderChain(chunkHeaders); err != nil {
1371+
if cutoff != 0 {
1372+
if n, err := d.blockchain.InsertHeadersBeforeCutoff(chunkHeaders[:cutoff]); err != nil {
1373+
log.Warn("Failed to insert ancient header chain", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
1374+
return fmt.Errorf("%w: %v", errInvalidChain, err)
1375+
}
1376+
log.Debug("Inserted headers before cutoff", "number", chunkHeaders[cutoff-1].Number, "hash", chunkHashes[cutoff-1])
1377+
}
1378+
if len(chunkHeaders[cutoff:]) > 0 {
1379+
if n, err := d.blockchain.InsertHeaderChain(chunkHeaders[cutoff:]); err != nil {
13321380
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
13331381
return fmt.Errorf("%w: %v", errInvalidChain, err)
13341382
}
@@ -1343,12 +1391,21 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
13431391
case <-timer.C:
13441392
}
13451393
}
1346-
// Otherwise insert the headers for content retrieval
1347-
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
1348-
if inserts != len(chunkHeaders) {
1349-
return fmt.Errorf("%w: stale headers", errBadPeer)
1394+
// Otherwise, schedule the headers for content retrieval (block bodies and
1395+
// potentially receipts in snap sync).
1396+
//
1397+
// Skip the bodies/receipts retrieval scheduling before the cutoff in snap
1398+
// sync if chain pruning is configured.
1399+
if mode == SnapSync && cutoff != 0 {
1400+
chunkHeaders = chunkHeaders[cutoff:]
1401+
chunkHashes = chunkHashes[cutoff:]
1402+
}
1403+
if len(chunkHeaders) > 0 {
1404+
scheduled = true
1405+
if d.queue.Schedule(chunkHeaders, chunkHashes, origin+uint64(cutoff)) != len(chunkHeaders) {
1406+
return fmt.Errorf("%w: stale headers", errBadPeer)
1407+
}
13501408
}
1351-
13521409
headers = headers[limit:]
13531410
hashes = hashes[limit:]
13541411
origin += uint64(limit)
@@ -1360,11 +1417,13 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
13601417
}
13611418
d.syncStatsLock.Unlock()
13621419

1363-
// Signal the content downloaders of the availability of new tasks
1364-
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
1365-
select {
1366-
case ch <- true:
1367-
default:
1420+
// Signal the downloader of the availability of new tasks
1421+
if scheduled {
1422+
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
1423+
select {
1424+
case ch <- true:
1425+
default:
1426+
}
13681427
}
13691428
}
13701429
}
@@ -1724,10 +1783,16 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
17241783
header = d.blockchain.CurrentHeader()
17251784
block = d.blockchain.CurrentSnapBlock()
17261785
)
1727-
syncedBlocks := block.Number.Uint64() - d.syncStartBlock
1728-
if syncedBlocks == 0 {
1786+
// Prevent reporting noise if the actual chain synchronization (headers
1787+
// and bodies) hasn't started yet. Inserting the ancient header chain is
1788+
// fast enough and would introduce significant bias if included in the count.
1789+
if d.chainCutoffNumber != 0 && block.Number.Uint64() <= d.chainCutoffNumber {
17291790
return
17301791
}
1792+
fetchedBlocks := block.Number.Uint64() - d.syncStartBlock
1793+
if d.chainCutoffNumber != 0 && d.chainCutoffNumber > d.syncStartBlock {
1794+
fetchedBlocks = block.Number.Uint64() - d.chainCutoffNumber
1795+
}
17311796
// Retrieve the current chain head and calculate the ETA
17321797
latest, _, _, err := d.skeleton.Bounds()
17331798
if err != nil {
@@ -1742,7 +1807,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
17421807
}
17431808
var (
17441809
left = latest.Number.Uint64() - block.Number.Uint64()
1745-
eta = time.Since(d.syncStartTime) / time.Duration(syncedBlocks) * time.Duration(left)
1810+
eta = time.Since(d.syncStartTime) / time.Duration(fetchedBlocks) * time.Duration(left)
17461811

17471812
progress = fmt.Sprintf("%.2f%%", float64(block.Number.Uint64())*100/float64(latest.Number.Uint64()))
17481813
headers = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(header.Number.Uint64()), common.StorageSize(headerBytes).TerminalString())

0 commit comments

Comments
 (0)