diff --git a/eth/backend.go b/eth/backend.go index 85095618222..3627fb887e7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -591,29 +591,3 @@ func (s *Ethereum) Stop() error { return nil } - -// SyncMode retrieves the current sync mode, either explicitly set, or derived -// from the chain status. -func (s *Ethereum) SyncMode() ethconfig.SyncMode { - // If we're in snap sync mode, return that directly - if s.handler.snapSync.Load() { - return ethconfig.SnapSync - } - // We are probably in full sync, but we might have rewound to before the - // snap sync pivot, check if we should re-enable snap sync. - head := s.blockchain.CurrentBlock() - if pivot := rawdb.ReadLastPivotNumber(s.chainDb); pivot != nil { - if head.Number.Uint64() < *pivot { - return ethconfig.SnapSync - } - } - // We are in a full sync, but the associated head state is missing. To complete - // the head state, forcefully rerun the snap sync. Note it doesn't mean the - // persistent state is corrupted, just mismatch with the head block. - if !s.blockchain.HasState(head.Root) { - log.Info("Reenabled snap sync as chain is stateless") - return ethconfig.SnapSync - } - // Nope, we're really full syncing - return ethconfig.FullSync -} diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 75b263bf6b6..b33f6ceff24 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -273,7 +273,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl } } log.Info("Forkchoice requested sync to new head", context...) - if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header, finalized); err != nil { + if err := api.eth.Downloader().BeaconSync(header, finalized); err != nil { return engine.STATUS_SYNCING, err } return engine.STATUS_SYNCING, nil @@ -747,7 +747,7 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe // tries to make it import a block. That should be denied as pushing something // into the database directly will conflict with the assumptions of snap sync // that it has an empty db that it can fill itself. - if api.eth.SyncMode() != ethconfig.FullSync { + if api.eth.Downloader().GetSyncMode() == ethconfig.SnapSync { return api.delayPayloadImport(block), nil } if !api.eth.BlockChain().HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { @@ -795,7 +795,7 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) engine.PayloadSt // Although we don't want to trigger a sync, if there is one already in // progress, try to extend it with the current payload request to relieve // some strain from the forkchoice update. - err := api.eth.Downloader().BeaconExtend(api.eth.SyncMode(), block.Header()) + err := api.eth.Downloader().BeaconExtend(block.Header()) if err == nil { log.Debug("Payload accepted for sync extension", "number", block.NumberU64(), "hash", block.Hash()) return engine.PayloadStatusV1{Status: engine.SYNCING} @@ -804,7 +804,7 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) engine.PayloadSt // payload as non-integratable on top of the existing sync. We'll just // have to rely on the beacon client to forcefully update the head with // a forkchoice update request. - if api.eth.SyncMode() == ethconfig.FullSync { + if api.eth.Downloader().GetSyncMode() == ethconfig.FullSync { // In full sync mode, failure to import a well-formed block can only mean // that the parent state is missing and the syncer rejected extending the // current cycle with the new payload. diff --git a/eth/downloader/beacondevsync.go b/eth/downloader/beacondevsync.go index 03f17b1a52c..52e43f86b43 100644 --- a/eth/downloader/beacondevsync.go +++ b/eth/downloader/beacondevsync.go @@ -33,14 +33,14 @@ import ( // Note, this must not be used in live code. If the forkchcoice endpoint where // to use this instead of giving us the payload first, then essentially nobody // in the network would have the block yet that we'd attempt to retrieve. -func (d *Downloader) BeaconDevSync(mode SyncMode, header *types.Header) error { +func (d *Downloader) BeaconDevSync(header *types.Header) error { // Be very loud that this code should not be used in a live node log.Warn("----------------------------------") log.Warn("Beacon syncing with hash as target", "number", header.Number, "hash", header.Hash()) log.Warn("This is unhealthy for a live node!") log.Warn("This is incompatible with the consensus layer!") log.Warn("----------------------------------") - return d.BeaconSync(mode, header, header) + return d.BeaconSync(header, header) } // GetHeader tries to retrieve the header with a given hash from a random peer. diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 12b74a1ba98..405643e576f 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -34,7 +34,6 @@ import ( // directed by the skeleton sync's head/tail events. type beaconBackfiller struct { downloader *Downloader // Downloader to direct via this callback implementation - syncMode SyncMode // Sync mode to use for backfilling the skeleton chains success func() // Callback to run on successful sync cycle completion filling bool // Flag whether the downloader is backfilling or not filled *types.Header // Last header filled by the last terminated sync loop @@ -92,7 +91,6 @@ func (b *beaconBackfiller) resume() { b.filling = true b.filled = nil b.started = make(chan struct{}) - mode := b.syncMode b.lock.Unlock() // Start the backfilling on its own thread since the downloader does not have @@ -107,7 +105,7 @@ func (b *beaconBackfiller) resume() { }() // If the downloader fails, report an error as in beacon chain mode there // should be no errors as long as the chain we're syncing to is valid. - if err := b.downloader.synchronise(mode, b.started); err != nil { + if err := b.downloader.synchronise(b.started); err != nil { log.Error("Beacon backfilling failed", "err", err) return } @@ -119,27 +117,6 @@ func (b *beaconBackfiller) resume() { }() } -// setMode updates the sync mode from the current one to the requested one. If -// there's an active sync in progress, it will be cancelled and restarted. -func (b *beaconBackfiller) setMode(mode SyncMode) { - // Update the old sync mode and track if it was changed - b.lock.Lock() - oldMode := b.syncMode - updated := oldMode != mode - filling := b.filling - b.syncMode = mode - b.lock.Unlock() - - // If the sync mode was changed mid-sync, restart. This should never ever - // really happen, we just handle it to detect programming errors. - if !updated || !filling { - return - } - log.Error("Downloader sync mode changed mid-run", "old", oldMode.String(), "new", mode.String()) - b.suspend() - b.resume() -} - // SetBadBlockCallback sets the callback to run when a bad block is hit by the // block processor. This method is not thread safe and should be set only once // on startup before system events are fired. @@ -153,8 +130,8 @@ func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) { // // Internally backfilling and state sync is done the same way, but the header // retrieval and scheduling is replaced. -func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types.Header) error { - return d.beaconSync(mode, head, final, true) +func (d *Downloader) BeaconSync(head *types.Header, final *types.Header) error { + return d.beaconSync(head, final, true) } // BeaconExtend is an optimistic version of BeaconSync, where an attempt is made @@ -163,8 +140,8 @@ func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types. // // This is useful if a beacon client is feeding us large chunks of payloads to run, // but is not setting the head after each. -func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error { - return d.beaconSync(mode, head, nil, false) +func (d *Downloader) BeaconExtend(head *types.Header) error { + return d.beaconSync(head, nil, false) } // beaconSync is the post-merge version of the chain synchronization, where the @@ -173,20 +150,9 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error { // // Internally backfilling and state sync is done the same way, but the header // retrieval and scheduling is replaced. -func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, final *types.Header, force bool) error { - // When the downloader starts a sync cycle, it needs to be aware of the sync - // mode to use (full, snap). To keep the skeleton chain oblivious, inject the - // mode into the backfiller directly. - // - // Super crazy dangerous type cast. Should be fine (TM), we're only using a - // different backfiller implementation for skeleton tests. - d.skeleton.filler.(*beaconBackfiller).setMode(mode) - +func (d *Downloader) beaconSync(head *types.Header, final *types.Header, force bool) error { // Signal the skeleton sync to switch to a new head, however it wants - if err := d.skeleton.Sync(head, final, force); err != nil { - return err - } - return nil + return d.skeleton.Sync(head, final, force) } // findBeaconAncestor tries to locate the common ancestor link of the local chain diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 09837a30450..d99fb0a1abc 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -97,8 +97,9 @@ type headerTask struct { } type Downloader struct { - mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode - mux *event.TypeMux // Event multiplexer to announce sync operation events + mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode + moder *moder // Sync mode management, deliver the appropriate sync mode choice for each cycle + mux *event.TypeMux // Event multiplexer to announce sync operation events queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed @@ -165,6 +166,9 @@ type BlockChain interface { // HasHeader verifies a header's presence in the local chain. HasHeader(common.Hash, uint64) bool + // HasState checks if state trie is fully present in the database or not. + HasState(root common.Hash) bool + // GetHeaderByHash retrieves a header from the local chain. GetHeaderByHash(common.Hash) *types.Header @@ -221,10 +225,11 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader { +func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader { cutoffNumber, cutoffHash := chain.HistoryPruningCutoff() dl := &Downloader{ stateDB: stateDb, + moder: newSyncModer(mode, chain, stateDb), mux: mux, queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), peers: newPeerSet(), @@ -331,7 +336,7 @@ func (d *Downloader) UnregisterPeer(id string) error { // synchronise will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronize if its TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error { +func (d *Downloader) synchronise(beaconPing chan struct{}) (err error) { // The beacon header syncer is async. It will start this synchronization and // will continue doing other tasks. However, if synchronization needs to be // cancelled, the syncer needs to know if we reached the startup point (and @@ -356,6 +361,13 @@ func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error if d.notified.CompareAndSwap(false, true) { log.Info("Block synchronisation started") } + mode := d.moder.getMode() + defer func() { + if err == nil && mode == ethconfig.SnapSync { + d.moder.disableSnap() + log.Info("Disabled the snap sync after the initial sync cycle") + } + }() if mode == ethconfig.SnapSync { // Snap sync will directly modify the persistent state, making the entire // trie database unusable until the state is fully synced. To prevent any @@ -399,6 +411,7 @@ func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error // Atomically set the requested sync mode d.mode.Store(uint32(mode)) + defer d.mode.Store(0) if beaconPing != nil { close(beaconPing) @@ -406,10 +419,16 @@ func (d *Downloader) synchronise(mode SyncMode, beaconPing chan struct{}) error return d.syncToHead() } +// getMode returns the sync mode used within current cycle. func (d *Downloader) getMode() SyncMode { return SyncMode(d.mode.Load()) } +// GetSyncMode returns the suggested sync mode. +func (d *Downloader) GetSyncMode() SyncMode { + return d.moder.getMode() +} + // syncToHead starts a block synchronization based on the hash chain from // the specified head hash. func (d *Downloader) syncToHead() (err error) { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index c1a31d6e1c2..7fa2522a3d4 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/event" @@ -49,12 +50,12 @@ type downloadTester struct { } // newTester creates a new downloader test mocker. -func newTester(t *testing.T) *downloadTester { - return newTesterWithNotification(t, nil) +func newTester(t *testing.T, mode ethconfig.SyncMode) *downloadTester { + return newTesterWithNotification(t, mode, nil) } // newTesterWithNotification creates a new downloader test mocker. -func newTesterWithNotification(t *testing.T, success func()) *downloadTester { +func newTesterWithNotification(t *testing.T, mode ethconfig.SyncMode, success func()) *downloadTester { db, err := rawdb.Open(rawdb.NewMemoryDatabase(), rawdb.OpenOptions{}) if err != nil { panic(err) @@ -75,7 +76,7 @@ func newTesterWithNotification(t *testing.T, success func()) *downloadTester { chain: chain, peers: make(map[string]*downloadTesterPeer), } - tester.downloader = New(db, new(event.TypeMux), tester.chain, tester.dropPeer, success) + tester.downloader = New(db, mode, new(event.TypeMux), tester.chain, tester.dropPeer, success) return tester } @@ -393,7 +394,7 @@ func TestCanonicalSynchronisation68Snap(t *testing.T) { testCanonSync(t, eth.ETH func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { success := make(chan struct{}) - tester := newTesterWithNotification(t, func() { + tester := newTesterWithNotification(t, mode, func() { close(success) }) defer tester.terminate() @@ -403,7 +404,7 @@ func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { tester.newPeer("peer", protocol, chain.blocks[1:]) // Synchronise with the peer and make sure all relevant data was retrieved - if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { t.Fatalf("failed to beacon-sync chain: %v", err) } select { @@ -420,7 +421,7 @@ func TestThrottling68Full(t *testing.T) { testThrottling(t, eth.ETH68, FullSync) func TestThrottling68Snap(t *testing.T) { testThrottling(t, eth.ETH68, SnapSync) } func testThrottling(t *testing.T, protocol uint, mode SyncMode) { - tester := newTester(t) + tester := newTester(t, mode) defer tester.terminate() // Create a long block chain to download and the tester @@ -437,7 +438,7 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { // Start a synchronisation concurrently errc := make(chan error, 1) go func() { - errc <- tester.downloader.BeaconSync(mode, testChainBase.blocks[len(testChainBase.blocks)-1].Header(), nil) + errc <- tester.downloader.BeaconSync(testChainBase.blocks[len(testChainBase.blocks)-1].Header(), nil) }() // Iteratively take some blocks, always checking the retrieval count for { @@ -502,7 +503,7 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) { success := func() { close(complete) } - tester := newTesterWithNotification(t, success) + tester := newTesterWithNotification(t, mode, success) defer tester.terminate() chain := testChainBase.shorten(MaxHeaderFetch) @@ -514,7 +515,7 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) { t.Errorf("download queue not idle") } // Synchronise with the peer, but cancel afterwards - if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } <-complete @@ -534,7 +535,7 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { success := func() { close(complete) } - tester := newTesterWithNotification(t, success) + tester := newTesterWithNotification(t, mode, success) defer tester.terminate() // Create a small enough block chain to download @@ -543,7 +544,7 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { // Create peers of every type tester.newPeer("peer 68", eth.ETH68, chain.blocks[1:]) - if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { t.Fatalf("failed to start beacon sync: %v", err) } select { @@ -570,7 +571,7 @@ func TestEmptyShortCircuit68Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ET func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { success := make(chan struct{}) - tester := newTesterWithNotification(t, func() { + tester := newTesterWithNotification(t, mode, func() { close(success) }) defer tester.terminate() @@ -588,7 +589,7 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { receiptsHave.Add(int32(len(headers))) } - if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } select { @@ -650,7 +651,7 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { success := make(chan struct{}) - tester := newTesterWithNotification(t, func() { + tester := newTesterWithNotification(t, mode, func() { close(success) }) defer tester.terminate() @@ -662,7 +663,7 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) { if c.local > 0 { tester.chain.InsertChain(chain.blocks[1 : c.local+1]) } - if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { t.Fatalf("Failed to beacon sync chain %v %v", c.name, err) } select { @@ -685,7 +686,7 @@ func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapS func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { success := make(chan struct{}) - tester := newTesterWithNotification(t, func() { + tester := newTesterWithNotification(t, mode, func() { success <- struct{}{} }) defer tester.terminate() @@ -700,7 +701,7 @@ func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { faultyPeer.withholdBodies[header.Hash()] = struct{}{} } - if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)/2-1].Header(), nil); err != nil { + if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)/2-1].Header(), nil); err != nil { t.Fatalf("failed to beacon-sync chain: %v", err) } select { @@ -716,7 +717,7 @@ func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { // Synchronise all the blocks and check continuation progress tester.newPeer("peer-full", protocol, chain.blocks[1:]) - if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { t.Fatalf("failed to beacon-sync chain: %v", err) } startingBlock := uint64(len(chain.blocks)/2 - 1) diff --git a/eth/downloader/syncmode.go b/eth/downloader/syncmode.go new file mode 100644 index 00000000000..b6300e2d6d1 --- /dev/null +++ b/eth/downloader/syncmode.go @@ -0,0 +1,116 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "sync" + + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// moder is responsible for managing the downloader's sync mode. It takes the +// user's preference at startup and then determines the appropriate sync mode +// based on the current chain status. +type moder struct { + mode ethconfig.SyncMode + chain BlockChain + disk ethdb.KeyValueReader + lock sync.Mutex +} + +func newSyncModer(mode ethconfig.SyncMode, chain BlockChain, disk ethdb.KeyValueReader) *moder { + if mode == ethconfig.FullSync { + // The database seems empty as the current block is the genesis. Yet the snap + // block is ahead, so snap sync was enabled for this node at a certain point. + // The scenarios where this can happen is + // * if the user manually (or via a bad block) rolled back a snap sync node + // below the sync point. + // * the last snap sync is not finished while user specifies a full sync this + // time. But we don't have any recent state for full sync. + // In these cases however it's safe to reenable snap sync. + fullBlock, snapBlock := chain.CurrentBlock(), chain.CurrentSnapBlock() + if fullBlock.Number.Uint64() == 0 && snapBlock.Number.Uint64() > 0 { + mode = ethconfig.SnapSync + log.Warn("Switch sync mode from full sync to snap sync", "reason", "snap sync incomplete") + } else if !chain.HasState(fullBlock.Root) { + mode = ethconfig.SnapSync + log.Warn("Switch sync mode from full sync to snap sync", "reason", "head state missing") + } else { + // Grant the full sync mode + log.Info("Enabled full sync", "head", fullBlock.Number, "hash", fullBlock.Hash()) + } + } else { + head := chain.CurrentBlock() + if head.Number.Uint64() > 0 && chain.HasState(head.Root) { + mode = ethconfig.FullSync + log.Info("Switch sync mode from snap sync to full sync", "reason", "snap sync complete") + } else { + // If snap sync was requested and our database is empty, grant it + log.Info("Enabled snap sync", "head", head.Number, "hash", head.Hash()) + } + } + return &moder{ + mode: mode, + chain: chain, + disk: disk, + } +} + +// getMode retrieves the current sync mode, either explicitly set, or derived +// from the chain status. +func (m *moder) getMode() ethconfig.SyncMode { + m.lock.Lock() + defer m.lock.Unlock() + + // If we're in snap sync mode, return that directly + if m.mode == ethconfig.SnapSync { + return ethconfig.SnapSync + } + // We are probably in full sync, but we might have rewound to before the + // snap sync pivot, check if we should re-enable snap sync. + head := m.chain.CurrentBlock() + if pivot := rawdb.ReadLastPivotNumber(m.disk); pivot != nil { + if head.Number.Uint64() < *pivot { + log.Info("Reenabled snap sync as chain is lagging behind the pivot", "head", head.Number, "pivot", pivot) + return ethconfig.SnapSync + } + } + // We are in a full sync, but the associated head state is missing. To complete + // the head state, forcefully rerun the snap sync. Note it doesn't mean the + // persistent state is corrupted, just mismatch with the head block. + if !m.chain.HasState(head.Root) { + log.Info("Reenabled snap sync as chain is stateless") + return ethconfig.SnapSync + } + // Nope, we're really full syncing + return ethconfig.FullSync +} + +// disableSnap disables the snap sync mode, usually it's called after a successful +// snap sync. +func (m *moder) disableSnap() { + m.lock.Lock() + defer m.lock.Unlock() + + if m.mode == ethconfig.FullSync { + return + } + m.mode = ethconfig.FullSync +} diff --git a/eth/handler.go b/eth/handler.go index ff970e2ba62..43f26ef6636 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -111,9 +111,7 @@ type handlerConfig struct { type handler struct { nodeID enode.ID networkID uint64 - - snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) - synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) + synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) database ethdb.Database txpool txPool @@ -161,40 +159,13 @@ func newHandler(config *handlerConfig) (*handler, error) { handlerDoneCh: make(chan struct{}), handlerStartCh: make(chan struct{}), } - if config.Sync == ethconfig.FullSync { - // The database seems empty as the current block is the genesis. Yet the snap - // block is ahead, so snap sync was enabled for this node at a certain point. - // The scenarios where this can happen is - // * if the user manually (or via a bad block) rolled back a snap sync node - // below the sync point. - // * the last snap sync is not finished while user specifies a full sync this - // time. But we don't have any recent state for full sync. - // In these cases however it's safe to reenable snap sync. - fullBlock, snapBlock := h.chain.CurrentBlock(), h.chain.CurrentSnapBlock() - if fullBlock.Number.Uint64() == 0 && snapBlock.Number.Uint64() > 0 { - h.snapSync.Store(true) - log.Warn("Switch sync mode from full sync to snap sync", "reason", "snap sync incomplete") - } else if !h.chain.HasState(fullBlock.Root) { - h.snapSync.Store(true) - log.Warn("Switch sync mode from full sync to snap sync", "reason", "head state missing") - } - } else { - head := h.chain.CurrentBlock() - if head.Number.Uint64() > 0 && h.chain.HasState(head.Root) { - log.Info("Switch sync mode from snap sync to full sync", "reason", "snap sync complete") - } else { - // If snap sync was requested and our database is empty, grant it - h.snapSync.Store(true) - log.Info("Enabled snap sync", "head", head.Number, "hash", head.Hash()) - } - } + // Construct the downloader (long sync) + h.downloader = downloader.New(config.Database, config.Sync, h.eventMux, h.chain, h.removePeer, h.enableSyncedFeatures) + // If snap sync is requested but snapshots are disabled, fail loudly - if h.snapSync.Load() && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) { + if h.downloader.GetSyncMode() == ethconfig.SnapSync && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) { return nil, errors.New("snap sync not supported with snapshots disabled") } - // Construct the downloader (long sync) - h.downloader = downloader.New(config.Database, h.eventMux, h.chain, h.removePeer, h.enableSyncedFeatures) - fetchTx := func(peer string, hashes []common.Hash) error { p := h.peers.peer(peer) if p == nil { @@ -267,7 +238,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { return err } reject := false // reserved peer slots - if h.snapSync.Load() { + if h.downloader.GetSyncMode() == ethconfig.SnapSync { if snap == nil { // If we are running snap-sync, we want to reserve roughly half the peer // slots for peers supporting the snap protocol. @@ -544,15 +515,7 @@ func (h *handler) txBroadcastLoop() { // enableSyncedFeatures enables the post-sync functionalities when the initial // sync is finished. func (h *handler) enableSyncedFeatures() { - // Mark the local node as synced. h.synced.Store(true) - - // If we were running snap sync and it finished, disable doing another - // round on next sync cycle - if h.snapSync.Load() { - log.Info("Snap sync complete, auto disabling") - h.snapSync.Store(false) - } } // blockRangeState holds the state of the block range update broadcasting mechanism. @@ -590,7 +553,7 @@ func (h *handler) blockRangeLoop(st *blockRangeState) { if ev == nil { continue } - if _, ok := ev.Data.(downloader.StartEvent); ok && h.snapSync.Load() { + if _, ok := ev.Data.(downloader.StartEvent); ok && h.downloader.GetSyncMode() == ethconfig.SnapSync { h.blockRangeWhileSnapSyncing(st) } case <-st.headCh: diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 058a0d59499..1343cae03e7 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -232,7 +232,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { t.Parallel() // Create a message handler, configure it to accept transactions and watch them - handler := newTestHandler() + handler := newTestHandler(ethconfig.FullSync) defer handler.close() handler.handler.synced.Store(true) // mark synced to accept transactions @@ -284,7 +284,7 @@ func testSendTransactions(t *testing.T, protocol uint) { t.Parallel() // Create a message handler and fill the pool with big transactions - handler := newTestHandler() + handler := newTestHandler(ethconfig.FullSync) defer handler.close() insert := make([]*types.Transaction, 100) @@ -365,13 +365,12 @@ func testTransactionPropagation(t *testing.T, protocol uint) { // Create a source handler to send transactions from and a number of sinks // to receive them. We need multiple sinks since a one-to-one peering would // broadcast all transactions without announcement. - source := newTestHandler() - source.handler.snapSync.Store(false) // Avoid requiring snap, otherwise some will be dropped below + source := newTestHandler(ethconfig.FullSync) defer source.close() sinks := make([]*testHandler, 10) for i := 0; i < len(sinks); i++ { - sinks[i] = newTestHandler() + sinks[i] = newTestHandler(ethconfig.FullSync) defer sinks[i].close() sinks[i].handler.synced.Store(true) // mark synced to accept transactions diff --git a/eth/handler_test.go b/eth/handler_test.go index b37e6227f42..312e5625ba2 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -174,13 +174,13 @@ type testHandler struct { } // newTestHandler creates a new handler for testing purposes with no blocks. -func newTestHandler() *testHandler { - return newTestHandlerWithBlocks(0) +func newTestHandler(mode ethconfig.SyncMode) *testHandler { + return newTestHandlerWithBlocks(0, mode) } // newTestHandlerWithBlocks creates a new handler for testing purposes, with a // given number of initial blocks. -func newTestHandlerWithBlocks(blocks int) *testHandler { +func newTestHandlerWithBlocks(blocks int, mode ethconfig.SyncMode) *testHandler { // Create a database pre-initialize with a genesis block db := rawdb.NewMemoryDatabase() gspec := &core.Genesis{ @@ -200,7 +200,7 @@ func newTestHandlerWithBlocks(blocks int) *testHandler { Chain: chain, TxPool: txpool, Network: 1, - Sync: ethconfig.SnapSync, + Sync: mode, BloomCache: 1, }) handler.Start(1000) diff --git a/eth/sync_test.go b/eth/sync_test.go index dc295f27904..0fb54a5e14f 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -36,17 +36,11 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) { t.Parallel() // Create an empty handler and ensure it's in snap sync mode - empty := newTestHandler() - if !empty.handler.snapSync.Load() { - t.Fatalf("snap sync disabled on pristine blockchain") - } + empty := newTestHandler(ethconfig.SnapSync) defer empty.close() // Create a full handler and ensure snap sync ends up disabled - full := newTestHandlerWithBlocks(1024) - if full.handler.snapSync.Load() { - t.Fatalf("snap sync not disabled on non-empty blockchain") - } + full := newTestHandlerWithBlocks(1024, ethconfig.SnapSync) defer full.close() // Sync up the two handlers via both `eth` and `snap` @@ -85,7 +79,7 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) { time.Sleep(250 * time.Millisecond) // Check that snap sync was disabled - if err := empty.handler.downloader.BeaconSync(ethconfig.SnapSync, full.chain.CurrentBlock(), nil); err != nil { + if err := empty.handler.downloader.BeaconSync(full.chain.CurrentBlock(), nil); err != nil { t.Fatal("sync failed:", err) } // Downloader internally has to wait for a timer (3s) to be expired before @@ -96,7 +90,7 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) { case <-timeout: t.Fatalf("snap sync not disabled after successful synchronisation") case <-time.After(100 * time.Millisecond): - if !empty.handler.snapSync.Load() { + if empty.handler.downloader.GetSyncMode() == ethconfig.FullSync { return } } diff --git a/eth/syncer/syncer.go b/eth/syncer/syncer.go index 6b33ec54ba5..416bd62ed16 100644 --- a/eth/syncer/syncer.go +++ b/eth/syncer/syncer.go @@ -129,7 +129,11 @@ func (s *Syncer) run() { break } if resync { - req.errc <- s.backend.Downloader().BeaconDevSync(ethconfig.FullSync, target) + if mode := s.backend.Downloader().GetSyncMode(); mode != ethconfig.FullSync { + req.errc <- fmt.Errorf("unsupported syncmode %v, please relaunch geth with --syncmode full", mode) + } else { + req.errc <- s.backend.Downloader().BeaconDevSync(target) + } } case <-ticker.C: