Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,29 +591,3 @@ func (s *Ethereum) Stop() error {

return nil
}

// SyncMode retrieves the current sync mode, either explicitly set, or derived
// from the chain status.
func (s *Ethereum) SyncMode() ethconfig.SyncMode {
// If we're in snap sync mode, return that directly
if s.handler.snapSync.Load() {
return ethconfig.SnapSync
}
// We are probably in full sync, but we might have rewound to before the
// snap sync pivot, check if we should re-enable snap sync.
head := s.blockchain.CurrentBlock()
if pivot := rawdb.ReadLastPivotNumber(s.chainDb); pivot != nil {
if head.Number.Uint64() < *pivot {
return ethconfig.SnapSync
}
}
// We are in a full sync, but the associated head state is missing. To complete
// the head state, forcefully rerun the snap sync. Note it doesn't mean the
// persistent state is corrupted, just mismatch with the head block.
if !s.blockchain.HasState(head.Root) {
log.Info("Reenabled snap sync as chain is stateless")
return ethconfig.SnapSync
}
// Nope, we're really full syncing
return ethconfig.FullSync
}
8 changes: 4 additions & 4 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
}
}
log.Info("Forkchoice requested sync to new head", context...)
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header, finalized); err != nil {
if err := api.eth.Downloader().BeaconSync(header, finalized); err != nil {
return engine.STATUS_SYNCING, err
}
return engine.STATUS_SYNCING, nil
Expand Down Expand Up @@ -747,7 +747,7 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
// tries to make it import a block. That should be denied as pushing something
// into the database directly will conflict with the assumptions of snap sync
// that it has an empty db that it can fill itself.
if api.eth.SyncMode() != ethconfig.FullSync {
if api.eth.Downloader().GetSyncMode() == ethconfig.SnapSync {
return api.delayPayloadImport(block), nil
}
if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
Expand Down Expand Up @@ -795,7 +795,7 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) engine.PayloadSt
// Although we don't want to trigger a sync, if there is one already in
// progress, try to extend it with the current payload request to relieve
// some strain from the forkchoice update.
err := api.eth.Downloader().BeaconExtend(api.eth.SyncMode(), block.Header())
err := api.eth.Downloader().BeaconExtend(block.Header())
if err == nil {
log.Debug("Payload accepted for sync extension", "number", block.NumberU64(), "hash", block.Hash())
return engine.PayloadStatusV1{Status: engine.SYNCING}
Expand All @@ -804,7 +804,7 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) engine.PayloadSt
// payload as non-integratable on top of the existing sync. We'll just
// have to rely on the beacon client to forcefully update the head with
// a forkchoice update request.
if api.eth.SyncMode() == ethconfig.FullSync {
if api.eth.Downloader().GetSyncMode() == ethconfig.FullSync {
// In full sync mode, failure to import a well-formed block can only mean
// that the parent state is missing and the syncer rejected extending the
// current cycle with the new payload.
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/beacondevsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ import (
// Note, this must not be used in live code. If the forkchcoice endpoint where
// to use this instead of giving us the payload first, then essentially nobody
// in the network would have the block yet that we'd attempt to retrieve.
func (d *Downloader) BeaconDevSync(mode SyncMode, header *types.Header) error {
func (d *Downloader) BeaconDevSync(header *types.Header) error {
// Be very loud that this code should not be used in a live node
log.Warn("----------------------------------")
log.Warn("Beacon syncing with hash as target", "number", header.Number, "hash", header.Hash())
log.Warn("This is unhealthy for a live node!")
log.Warn("This is incompatible with the consensus layer!")
log.Warn("----------------------------------")
return d.BeaconSync(mode, header, header)
return d.BeaconSync(header, header)
}

// GetHeader tries to retrieve the header with a given hash from a random peer.
Expand Down
48 changes: 7 additions & 41 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
// directed by the skeleton sync's head/tail events.
type beaconBackfiller struct {
downloader *Downloader // Downloader to direct via this callback implementation
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
success func() // Callback to run on successful sync cycle completion
filling bool // Flag whether the downloader is backfilling or not
filled *types.Header // Last header filled by the last terminated sync loop
Expand Down Expand Up @@ -92,7 +91,6 @@ func (b *beaconBackfiller) resume() {
b.filling = true
b.filled = nil
b.started = make(chan struct{})
mode := b.syncMode
b.lock.Unlock()

// Start the backfilling on its own thread since the downloader does not have
Expand All @@ -107,7 +105,7 @@ func (b *beaconBackfiller) resume() {
}()
// If the downloader fails, report an error as in beacon chain mode there
// should be no errors as long as the chain we're syncing to is valid.
if err := b.downloader.synchronise(mode, b.started); err != nil {
if err := b.downloader.synchronise(b.started); err != nil {
log.Error("Beacon backfilling failed", "err", err)
return
}
Expand All @@ -119,27 +117,6 @@ func (b *beaconBackfiller) resume() {
}()
}

// setMode updates the sync mode from the current one to the requested one. If
// there's an active sync in progress, it will be cancelled and restarted.
func (b *beaconBackfiller) setMode(mode SyncMode) {
// Update the old sync mode and track if it was changed
b.lock.Lock()
oldMode := b.syncMode
updated := oldMode != mode
filling := b.filling
b.syncMode = mode
b.lock.Unlock()

// If the sync mode was changed mid-sync, restart. This should never ever
// really happen, we just handle it to detect programming errors.
if !updated || !filling {
return
}
log.Error("Downloader sync mode changed mid-run", "old", oldMode.String(), "new", mode.String())
b.suspend()
b.resume()
}

// SetBadBlockCallback sets the callback to run when a bad block is hit by the
// block processor. This method is not thread safe and should be set only once
// on startup before system events are fired.
Expand All @@ -153,8 +130,8 @@ func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
//
// Internally backfilling and state sync is done the same way, but the header
// retrieval and scheduling is replaced.
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types.Header) error {
return d.beaconSync(mode, head, final, true)
func (d *Downloader) BeaconSync(head *types.Header, final *types.Header) error {
return d.beaconSync(head, final, true)
}

// BeaconExtend is an optimistic version of BeaconSync, where an attempt is made
Expand All @@ -163,8 +140,8 @@ func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types.
//
// This is useful if a beacon client is feeding us large chunks of payloads to run,
// but is not setting the head after each.
func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
return d.beaconSync(mode, head, nil, false)
func (d *Downloader) BeaconExtend(head *types.Header) error {
return d.beaconSync(head, nil, false)
}

// beaconSync is the post-merge version of the chain synchronization, where the
Expand All @@ -173,20 +150,9 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
//
// Internally backfilling and state sync is done the same way, but the header
// retrieval and scheduling is replaced.
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, final *types.Header, force bool) error {
// When the downloader starts a sync cycle, it needs to be aware of the sync
// mode to use (full, snap). To keep the skeleton chain oblivious, inject the
// mode into the backfiller directly.
//
// Super crazy dangerous type cast. Should be fine (TM), we're only using a
// different backfiller implementation for skeleton tests.
d.skeleton.filler.(*beaconBackfiller).setMode(mode)

func (d *Downloader) beaconSync(head *types.Header, final *types.Header, force bool) error {
// Signal the skeleton sync to switch to a new head, however it wants
if err := d.skeleton.Sync(head, final, force); err != nil {
return err
}
return nil
return d.skeleton.Sync(head, final, force)
}

// findBeaconAncestor tries to locate the common ancestor link of the local chain
Expand Down
27 changes: 23 additions & 4 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ type headerTask struct {
}

type Downloader struct {
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
moder *moder // Sync mode management, deliver the appropriate sync mode choice for each cycle
mux *event.TypeMux // Event multiplexer to announce sync operation events

queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
Expand Down Expand Up @@ -165,6 +166,9 @@ type BlockChain interface {
// HasHeader verifies a header's presence in the local chain.
HasHeader(common.Hash, uint64) bool

// HasState checks if state trie is fully present in the database or not.
HasState(root common.Hash) bool

// GetHeaderByHash retrieves a header from the local chain.
GetHeaderByHash(common.Hash) *types.Header

Expand Down Expand Up @@ -221,10 +225,11 @@ type BlockChain interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
cutoffNumber, cutoffHash := chain.HistoryPruningCutoff()
dl := &Downloader{
stateDB: stateDb,
moder: newSyncModer(mode, chain, stateDb),
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
Expand Down Expand Up @@ -331,7 +336,7 @@ func (d *Downloader) UnregisterPeer(id string) error {
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error {
func (d *Downloader) synchronise(beaconPing chan struct{}) (err error) {
// The beacon header syncer is async. It will start this synchronization and
// will continue doing other tasks. However, if synchronization needs to be
// cancelled, the syncer needs to know if we reached the startup point (and
Expand All @@ -356,6 +361,13 @@ func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error
if d.notified.CompareAndSwap(false, true) {
log.Info("Block synchronisation started")
}
mode := d.moder.getMode()
defer func() {
if err == nil && mode == ethconfig.SnapSync {
d.moder.disableSnap()
log.Info("Disabled the snap sync after the initial sync cycle")
}
}()
if mode == ethconfig.SnapSync {
// Snap sync will directly modify the persistent state, making the entire
// trie database unusable until the state is fully synced. To prevent any
Expand Down Expand Up @@ -399,17 +411,24 @@ func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error

// Atomically set the requested sync mode
d.mode.Store(uint32(mode))
defer d.mode.Store(0)

if beaconPing != nil {
close(beaconPing)
}
return d.syncToHead()
}

// getMode returns the sync mode used within current cycle.
func (d *Downloader) getMode() SyncMode {
return SyncMode(d.mode.Load())
}

// GetSyncMode returns the suggested sync mode.
func (d *Downloader) GetSyncMode() SyncMode {
return d.moder.getMode()
}

// syncToHead starts a block synchronization based on the hash chain from
// the specified head hash.
func (d *Downloader) syncToHead() (err error) {
Expand Down
Loading