Skip to content

Commit 0ab48f7

Browse files
committed
eth: integrate syncmode into downloader
1 parent ca91254 commit 0ab48f7

File tree

12 files changed

+197
-161
lines changed

12 files changed

+197
-161
lines changed

eth/backend.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -591,29 +591,3 @@ func (s *Ethereum) Stop() error {
591591

592592
return nil
593593
}
594-
595-
// SyncMode retrieves the current sync mode, either explicitly set, or derived
596-
// from the chain status.
597-
func (s *Ethereum) SyncMode() ethconfig.SyncMode {
598-
// If we're in snap sync mode, return that directly
599-
if s.handler.snapSync.Load() {
600-
return ethconfig.SnapSync
601-
}
602-
// We are probably in full sync, but we might have rewound to before the
603-
// snap sync pivot, check if we should re-enable snap sync.
604-
head := s.blockchain.CurrentBlock()
605-
if pivot := rawdb.ReadLastPivotNumber(s.chainDb); pivot != nil {
606-
if head.Number.Uint64() < *pivot {
607-
return ethconfig.SnapSync
608-
}
609-
}
610-
// We are in a full sync, but the associated head state is missing. To complete
611-
// the head state, forcefully rerun the snap sync. Note it doesn't mean the
612-
// persistent state is corrupted, just mismatch with the head block.
613-
if !s.blockchain.HasState(head.Root) {
614-
log.Info("Reenabled snap sync as chain is stateless")
615-
return ethconfig.SnapSync
616-
}
617-
// Nope, we're really full syncing
618-
return ethconfig.FullSync
619-
}

eth/catalyst/api.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
273273
}
274274
}
275275
log.Info("Forkchoice requested sync to new head", context...)
276-
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header, finalized); err != nil {
276+
if err := api.eth.Downloader().BeaconSync(header, finalized); err != nil {
277277
return engine.STATUS_SYNCING, err
278278
}
279279
return engine.STATUS_SYNCING, nil
@@ -747,7 +747,7 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
747747
// tries to make it import a block. That should be denied as pushing something
748748
// into the database directly will conflict with the assumptions of snap sync
749749
// that it has an empty db that it can fill itself.
750-
if api.eth.SyncMode() != ethconfig.FullSync {
750+
if api.eth.Downloader().GetSyncMode() == ethconfig.SnapSync {
751751
return api.delayPayloadImport(block), nil
752752
}
753753
if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
@@ -795,7 +795,7 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) engine.PayloadSt
795795
// Although we don't want to trigger a sync, if there is one already in
796796
// progress, try to extend it with the current payload request to relieve
797797
// some strain from the forkchoice update.
798-
err := api.eth.Downloader().BeaconExtend(api.eth.SyncMode(), block.Header())
798+
err := api.eth.Downloader().BeaconExtend(block.Header())
799799
if err == nil {
800800
log.Debug("Payload accepted for sync extension", "number", block.NumberU64(), "hash", block.Hash())
801801
return engine.PayloadStatusV1{Status: engine.SYNCING}
@@ -804,7 +804,7 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) engine.PayloadSt
804804
// payload as non-integratable on top of the existing sync. We'll just
805805
// have to rely on the beacon client to forcefully update the head with
806806
// a forkchoice update request.
807-
if api.eth.SyncMode() == ethconfig.FullSync {
807+
if api.eth.Downloader().GetSyncMode() == ethconfig.FullSync {
808808
// In full sync mode, failure to import a well-formed block can only mean
809809
// that the parent state is missing and the syncer rejected extending the
810810
// current cycle with the new payload.

eth/downloader/beacondevsync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ import (
3333
// Note, this must not be used in live code. If the forkchcoice endpoint where
3434
// to use this instead of giving us the payload first, then essentially nobody
3535
// in the network would have the block yet that we'd attempt to retrieve.
36-
func (d *Downloader) BeaconDevSync(mode SyncMode, header *types.Header) error {
36+
func (d *Downloader) BeaconDevSync(header *types.Header) error {
3737
// Be very loud that this code should not be used in a live node
3838
log.Warn("----------------------------------")
3939
log.Warn("Beacon syncing with hash as target", "number", header.Number, "hash", header.Hash())
4040
log.Warn("This is unhealthy for a live node!")
4141
log.Warn("This is incompatible with the consensus layer!")
4242
log.Warn("----------------------------------")
43-
return d.BeaconSync(mode, header, header)
43+
return d.BeaconSync(header, header)
4444
}
4545

4646
// GetHeader tries to retrieve the header with a given hash from a random peer.

eth/downloader/beaconsync.go

Lines changed: 7 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
// directed by the skeleton sync's head/tail events.
3535
type beaconBackfiller struct {
3636
downloader *Downloader // Downloader to direct via this callback implementation
37-
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
3837
success func() // Callback to run on successful sync cycle completion
3938
filling bool // Flag whether the downloader is backfilling or not
4039
filled *types.Header // Last header filled by the last terminated sync loop
@@ -92,7 +91,6 @@ func (b *beaconBackfiller) resume() {
9291
b.filling = true
9392
b.filled = nil
9493
b.started = make(chan struct{})
95-
mode := b.syncMode
9694
b.lock.Unlock()
9795

9896
// Start the backfilling on its own thread since the downloader does not have
@@ -107,7 +105,7 @@ func (b *beaconBackfiller) resume() {
107105
}()
108106
// If the downloader fails, report an error as in beacon chain mode there
109107
// should be no errors as long as the chain we're syncing to is valid.
110-
if err := b.downloader.synchronise(mode, b.started); err != nil {
108+
if err := b.downloader.synchronise(b.started); err != nil {
111109
log.Error("Beacon backfilling failed", "err", err)
112110
return
113111
}
@@ -119,27 +117,6 @@ func (b *beaconBackfiller) resume() {
119117
}()
120118
}
121119

122-
// setMode updates the sync mode from the current one to the requested one. If
123-
// there's an active sync in progress, it will be cancelled and restarted.
124-
func (b *beaconBackfiller) setMode(mode SyncMode) {
125-
// Update the old sync mode and track if it was changed
126-
b.lock.Lock()
127-
oldMode := b.syncMode
128-
updated := oldMode != mode
129-
filling := b.filling
130-
b.syncMode = mode
131-
b.lock.Unlock()
132-
133-
// If the sync mode was changed mid-sync, restart. This should never ever
134-
// really happen, we just handle it to detect programming errors.
135-
if !updated || !filling {
136-
return
137-
}
138-
log.Error("Downloader sync mode changed mid-run", "old", oldMode.String(), "new", mode.String())
139-
b.suspend()
140-
b.resume()
141-
}
142-
143120
// SetBadBlockCallback sets the callback to run when a bad block is hit by the
144121
// block processor. This method is not thread safe and should be set only once
145122
// on startup before system events are fired.
@@ -153,8 +130,8 @@ func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
153130
//
154131
// Internally backfilling and state sync is done the same way, but the header
155132
// retrieval and scheduling is replaced.
156-
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types.Header) error {
157-
return d.beaconSync(mode, head, final, true)
133+
func (d *Downloader) BeaconSync(head *types.Header, final *types.Header) error {
134+
return d.beaconSync(head, final, true)
158135
}
159136

160137
// BeaconExtend is an optimistic version of BeaconSync, where an attempt is made
@@ -163,8 +140,8 @@ func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types.
163140
//
164141
// This is useful if a beacon client is feeding us large chunks of payloads to run,
165142
// but is not setting the head after each.
166-
func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
167-
return d.beaconSync(mode, head, nil, false)
143+
func (d *Downloader) BeaconExtend(head *types.Header) error {
144+
return d.beaconSync(head, nil, false)
168145
}
169146

170147
// beaconSync is the post-merge version of the chain synchronization, where the
@@ -173,20 +150,9 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
173150
//
174151
// Internally backfilling and state sync is done the same way, but the header
175152
// retrieval and scheduling is replaced.
176-
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, final *types.Header, force bool) error {
177-
// When the downloader starts a sync cycle, it needs to be aware of the sync
178-
// mode to use (full, snap). To keep the skeleton chain oblivious, inject the
179-
// mode into the backfiller directly.
180-
//
181-
// Super crazy dangerous type cast. Should be fine (TM), we're only using a
182-
// different backfiller implementation for skeleton tests.
183-
d.skeleton.filler.(*beaconBackfiller).setMode(mode)
184-
153+
func (d *Downloader) beaconSync(head *types.Header, final *types.Header, force bool) error {
185154
// Signal the skeleton sync to switch to a new head, however it wants
186-
if err := d.skeleton.Sync(head, final, force); err != nil {
187-
return err
188-
}
189-
return nil
155+
return d.skeleton.Sync(head, final, force)
190156
}
191157

192158
// findBeaconAncestor tries to locate the common ancestor link of the local chain

eth/downloader/downloader.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,9 @@ type headerTask struct {
9797
}
9898

9999
type Downloader struct {
100-
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
101-
mux *event.TypeMux // Event multiplexer to announce sync operation events
100+
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
101+
moder *moder // Sync mode management, deliver the appropriate sync mode choice for each cycle
102+
mux *event.TypeMux // Event multiplexer to announce sync operation events
102103

103104
queue *queue // Scheduler for selecting the hashes to download
104105
peers *peerSet // Set of active peers from which download can proceed
@@ -165,6 +166,9 @@ type BlockChain interface {
165166
// HasHeader verifies a header's presence in the local chain.
166167
HasHeader(common.Hash, uint64) bool
167168

169+
// HasState checks if state trie is fully present in the database or not.
170+
HasState(root common.Hash) bool
171+
168172
// GetHeaderByHash retrieves a header from the local chain.
169173
GetHeaderByHash(common.Hash) *types.Header
170174

@@ -221,10 +225,11 @@ type BlockChain interface {
221225
}
222226

223227
// New creates a new downloader to fetch hashes and blocks from remote peers.
224-
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
228+
func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
225229
cutoffNumber, cutoffHash := chain.HistoryPruningCutoff()
226230
dl := &Downloader{
227231
stateDB: stateDb,
232+
moder: newSyncModer(mode, chain, stateDb),
228233
mux: mux,
229234
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
230235
peers: newPeerSet(),
@@ -331,7 +336,7 @@ func (d *Downloader) UnregisterPeer(id string) error {
331336
// synchronise will select the peer and use it for synchronising. If an empty string is given
332337
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
333338
// checks fail an error will be returned. This method is synchronous
334-
func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error {
339+
func (d *Downloader) synchronise(beaconPing chan struct{}) (err error) {
335340
// The beacon header syncer is async. It will start this synchronization and
336341
// will continue doing other tasks. However, if synchronization needs to be
337342
// cancelled, the syncer needs to know if we reached the startup point (and
@@ -356,6 +361,13 @@ func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error
356361
if d.notified.CompareAndSwap(false, true) {
357362
log.Info("Block synchronisation started")
358363
}
364+
mode := d.moder.getMode()
365+
defer func() {
366+
if err == nil && mode == ethconfig.SnapSync {
367+
d.moder.disableSnap()
368+
log.Info("Disabled the snap sync after the initial sync cycle")
369+
}
370+
}()
359371
if mode == ethconfig.SnapSync {
360372
// Snap sync will directly modify the persistent state, making the entire
361373
// trie database unusable until the state is fully synced. To prevent any
@@ -399,17 +411,24 @@ func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error
399411

400412
// Atomically set the requested sync mode
401413
d.mode.Store(uint32(mode))
414+
defer d.mode.Store(0)
402415

403416
if beaconPing != nil {
404417
close(beaconPing)
405418
}
406419
return d.syncToHead()
407420
}
408421

422+
// getMode returns the sync mode used within current cycle.
409423
func (d *Downloader) getMode() SyncMode {
410424
return SyncMode(d.mode.Load())
411425
}
412426

427+
// GetSyncMode returns the suggested sync mode.
428+
func (d *Downloader) GetSyncMode() SyncMode {
429+
return d.moder.getMode()
430+
}
431+
413432
// syncToHead starts a block synchronization based on the hash chain from
414433
// the specified head hash.
415434
func (d *Downloader) syncToHead() (err error) {

0 commit comments

Comments
 (0)