@@ -21,6 +21,7 @@ import (
2121 "errors"
2222 "fmt"
2323 "math/big"
24+ "sort"
2425 "sync"
2526 "sync/atomic"
2627 "time"
@@ -121,6 +122,12 @@ type Downloader struct {
121122 committed atomic.Bool
122123 ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
123124
125+ // The cutoff block number and hash before which chain segments (bodies
126+ // and receipts) are skipped during synchronization. 0 means the entire
127+ // chain segment is aimed for synchronization.
128+ chainCutoffNumber uint64
129+ chainCutoffHash common.Hash
130+
124131 // Channels
125132 headerProcCh chan * headerTask // Channel to feed the header processor new tasks
126133
@@ -193,6 +200,10 @@ type BlockChain interface {
193200 // SnapSyncCommitHead directly commits the head block to a certain entity.
194201 SnapSyncCommitHead (common.Hash ) error
195202
203+ // InsertHeadersBeforeCutoff inserts a batch of headers before the configured
204+ // chain cutoff into the ancient store.
205+ InsertHeadersBeforeCutoff ([]* types.Header ) (int , error )
206+
196207 // InsertChain inserts a batch of blocks into the local chain.
197208 InsertChain (types.Blocks ) (int , error )
198209
@@ -205,22 +216,29 @@ type BlockChain interface {
205216 // TrieDB retrieves the low level trie database used for interacting
206217 // with trie nodes.
207218 TrieDB () * triedb.Database
219+
220+ // HistoryPruningCutoff returns the configured history pruning point.
221+ // Block bodies along with the receipts will be skipped for synchronization.
222+ HistoryPruningCutoff () (uint64 , common.Hash )
208223}
209224
210225// New creates a new downloader to fetch hashes and blocks from remote peers.
211226func New (stateDb ethdb.Database , mux * event.TypeMux , chain BlockChain , dropPeer peerDropFn , success func ()) * Downloader {
227+ cutoffNumber , cutoffHash := chain .HistoryPruningCutoff ()
212228 dl := & Downloader {
213- stateDB : stateDb ,
214- mux : mux ,
215- queue : newQueue (blockCacheMaxItems , blockCacheInitialItems ),
216- peers : newPeerSet (),
217- blockchain : chain ,
218- dropPeer : dropPeer ,
219- headerProcCh : make (chan * headerTask , 1 ),
220- quitCh : make (chan struct {}),
221- SnapSyncer : snap .NewSyncer (stateDb , chain .TrieDB ().Scheme ()),
222- stateSyncStart : make (chan * stateSync ),
223- syncStartBlock : chain .CurrentSnapBlock ().Number .Uint64 (),
229+ stateDB : stateDb ,
230+ mux : mux ,
231+ queue : newQueue (blockCacheMaxItems , blockCacheInitialItems ),
232+ peers : newPeerSet (),
233+ blockchain : chain ,
234+ chainCutoffNumber : cutoffNumber ,
235+ chainCutoffHash : cutoffHash ,
236+ dropPeer : dropPeer ,
237+ headerProcCh : make (chan * headerTask , 1 ),
238+ quitCh : make (chan struct {}),
239+ SnapSyncer : snap .NewSyncer (stateDb , chain .TrieDB ().Scheme ()),
240+ stateSyncStart : make (chan * stateSync ),
241+ syncStartBlock : chain .CurrentSnapBlock ().Number .Uint64 (),
224242 }
225243 // Create the post-merge skeleton syncer and start the process
226244 dl .skeleton = newSkeleton (stateDb , dl .peers , dropPeer , newBeaconBackfiller (dl , success ))
@@ -599,6 +617,12 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
599617 d .ancientLimit = 0
600618 }
601619 }
620+ // Extend the ancient chain segment range if the ancient limit is even
621+ // below the pre-configured chain cutoff.
622+ if d .chainCutoffNumber != 0 && d .chainCutoffNumber > d .ancientLimit {
623+ d .ancientLimit = d .chainCutoffNumber
624+ log .Info ("Extend the ancient range with configured cutoff" , "cutoff" , d .chainCutoffNumber )
625+ }
602626 frozen , _ := d .stateDB .Ancients () // Ignore the error here since light client can also hit here.
603627
604628 // If a part of blockchain data has already been written into active store,
@@ -617,8 +641,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
617641 log .Info ("Truncated excess ancient chain segment" , "oldhead" , frozen - 1 , "newhead" , origin )
618642 }
619643 }
644+ // Skip ancient chain segments if Geth is running with a configured chain cutoff.
645+ // These segments are not guaranteed to be available in the network.
646+ chainOffset := origin + 1
647+ if mode == SnapSync && d .chainCutoffNumber != 0 {
648+ if chainOffset < d .chainCutoffNumber {
649+ chainOffset = d .chainCutoffNumber
650+ log .Info ("Skip chain segment before cutoff" , "origin" , origin , "cutoff" , d .chainCutoffNumber )
651+ }
652+ }
620653 // Initiate the sync using a concurrent header and content retrieval algorithm
621- d .queue .Prepare (origin + 1 , mode )
654+ d .queue .Prepare (chainOffset , mode )
622655 if d .syncInitHook != nil {
623656 d .syncInitHook (origin , height )
624657 }
@@ -632,8 +665,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
632665 }
633666 fetchers := []func () error {
634667 headerFetcher , // Headers are always retrieved
635- func () error { return d .fetchBodies (origin + 1 , beaconMode ) }, // Bodies are retrieved during normal and snap sync
636- func () error { return d .fetchReceipts (origin + 1 , beaconMode ) }, // Receipts are retrieved during snap sync
668+ func () error { return d .fetchBodies (chainOffset , beaconMode ) }, // Bodies are retrieved during normal and snap sync
669+ func () error { return d .fetchReceipts (chainOffset , beaconMode ) }, // Receipts are retrieved during snap sync
637670 func () error { return d .processHeaders (origin + 1 , td , ttd , beaconMode ) },
638671 }
639672 if mode == SnapSync {
@@ -1307,7 +1340,7 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
13071340 return nil
13081341 }
13091342 // Otherwise split the chunk of headers into batches and process them
1310- headers , hashes := task .headers , task .hashes
1343+ headers , hashes , scheduled := task .headers , task .hashes , false
13111344
13121345 gotHeaders = true
13131346 for len (headers ) > 0 {
@@ -1325,10 +1358,25 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
13251358 chunkHeaders := headers [:limit ]
13261359 chunkHashes := hashes [:limit ]
13271360
1328- // In case of header only syncing, validate the chunk immediately
1361+ // Split the headers around the chain cutoff
1362+ var cutoff int
1363+ if mode == SnapSync && d .chainCutoffNumber != 0 {
1364+ cutoff = sort .Search (len (chunkHeaders ), func (i int ) bool {
1365+ return chunkHeaders [i ].Number .Uint64 () >= d .chainCutoffNumber
1366+ })
1367+ }
1368+ // Insert the header chain into the ancient store (with block bodies and
1369+ // receipts set to nil) if they fall before the cutoff.
13291370 if mode == SnapSync {
1330- if len (chunkHeaders ) > 0 {
1331- if n , err := d .blockchain .InsertHeaderChain (chunkHeaders ); err != nil {
1371+ if cutoff != 0 {
1372+ if n , err := d .blockchain .InsertHeadersBeforeCutoff (chunkHeaders [:cutoff ]); err != nil {
1373+ log .Warn ("Failed to insert ancient header chain" , "number" , chunkHeaders [n ].Number , "hash" , chunkHashes [n ], "parent" , chunkHeaders [n ].ParentHash , "err" , err )
1374+ return fmt .Errorf ("%w: %v" , errInvalidChain , err )
1375+ }
1376+ log .Debug ("Inserted headers before cutoff" , "number" , chunkHeaders [cutoff - 1 ].Number , "hash" , chunkHashes [cutoff - 1 ])
1377+ }
1378+ if len (chunkHeaders [cutoff :]) > 0 {
1379+ if n , err := d .blockchain .InsertHeaderChain (chunkHeaders [cutoff :]); err != nil {
13321380 log .Warn ("Invalid header encountered" , "number" , chunkHeaders [n ].Number , "hash" , chunkHashes [n ], "parent" , chunkHeaders [n ].ParentHash , "err" , err )
13331381 return fmt .Errorf ("%w: %v" , errInvalidChain , err )
13341382 }
@@ -1343,12 +1391,21 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
13431391 case <- timer .C :
13441392 }
13451393 }
1346- // Otherwise insert the headers for content retrieval
1347- inserts := d .queue .Schedule (chunkHeaders , chunkHashes , origin )
1348- if inserts != len (chunkHeaders ) {
1349- return fmt .Errorf ("%w: stale headers" , errBadPeer )
1394+ // Otherwise, schedule the headers for content retrieval (block bodies and
1395+ // potentially receipts in snap sync).
1396+ //
1397+ // Skip the bodies/receipts retrieval scheduling before the cutoff in snap
1398+ // sync if chain pruning is configured.
1399+ if mode == SnapSync && cutoff != 0 {
1400+ chunkHeaders = chunkHeaders [cutoff :]
1401+ chunkHashes = chunkHashes [cutoff :]
1402+ }
1403+ if len (chunkHeaders ) > 0 {
1404+ scheduled = true
1405+ if d .queue .Schedule (chunkHeaders , chunkHashes , origin + uint64 (cutoff )) != len (chunkHeaders ) {
1406+ return fmt .Errorf ("%w: stale headers" , errBadPeer )
1407+ }
13501408 }
1351-
13521409 headers = headers [limit :]
13531410 hashes = hashes [limit :]
13541411 origin += uint64 (limit )
@@ -1360,11 +1417,13 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
13601417 }
13611418 d .syncStatsLock .Unlock ()
13621419
1363- // Signal the content downloaders of the availability of new tasks
1364- for _ , ch := range []chan bool {d .queue .blockWakeCh , d .queue .receiptWakeCh } {
1365- select {
1366- case ch <- true :
1367- default :
1420+ // Signal the downloader of the availability of new tasks
1421+ if scheduled {
1422+ for _ , ch := range []chan bool {d .queue .blockWakeCh , d .queue .receiptWakeCh } {
1423+ select {
1424+ case ch <- true :
1425+ default :
1426+ }
13681427 }
13691428 }
13701429 }
@@ -1724,10 +1783,20 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
17241783 header = d .blockchain .CurrentHeader ()
17251784 block = d .blockchain .CurrentSnapBlock ()
17261785 )
1727- syncedBlocks := block . Number . Uint64 () - d . syncStartBlock
1728- if syncedBlocks == 0 {
1786+ // Prevent reporting if nothing has been synchronized yet
1787+ if block . Number . Uint64 () <= d . syncStartBlock {
17291788 return
17301789 }
1790+ // Prevent reporting noise if the actual chain synchronization (headers
1791+ // and bodies) hasn't started yet. Inserting the ancient header chain is
1792+ // fast enough and would introduce significant bias if included in the count.
1793+ if d .chainCutoffNumber != 0 && block .Number .Uint64 () <= d .chainCutoffNumber {
1794+ return
1795+ }
1796+ fetchedBlocks := block .Number .Uint64 () - d .syncStartBlock
1797+ if d .chainCutoffNumber != 0 && d .chainCutoffNumber > d .syncStartBlock {
1798+ fetchedBlocks = block .Number .Uint64 () - d .chainCutoffNumber
1799+ }
17311800 // Retrieve the current chain head and calculate the ETA
17321801 latest , _ , _ , err := d .skeleton .Bounds ()
17331802 if err != nil {
@@ -1742,7 +1811,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
17421811 }
17431812 var (
17441813 left = latest .Number .Uint64 () - block .Number .Uint64 ()
1745- eta = time .Since (d .syncStartTime ) / time .Duration (syncedBlocks ) * time .Duration (left )
1814+ eta = time .Since (d .syncStartTime ) / time .Duration (fetchedBlocks ) * time .Duration (left )
17461815
17471816 progress = fmt .Sprintf ("%.2f%%" , float64 (block .Number .Uint64 ())* 100 / float64 (latest .Number .Uint64 ()))
17481817 headers = fmt .Sprintf ("%v@%v" , log .FormatLogfmtUint64 (header .Number .Uint64 ()), common .StorageSize (headerBytes ).TerminalString ())
0 commit comments