diff --git a/plugin/evm/atomic/sync/syncer.go b/plugin/evm/atomic/sync/syncer.go index 89d44247f4..f7a368f19c 100644 --- a/plugin/evm/atomic/sync/syncer.go +++ b/plugin/evm/atomic/sync/syncer.go @@ -33,6 +33,7 @@ const ( var ( _ sync.Syncer = (*Syncer)(nil) _ syncclient.LeafSyncTask = (*syncerLeafTask)(nil) + _ sync.Finalizer = (*Syncer)(nil) errTargetHeightRequired = errors.New("target height must be > 0") ) @@ -125,7 +126,6 @@ func NewSyncer(client syncclient.LeafClient, db *versiondb.Database, atomicTrie syncer.syncer = syncclient.NewCallbackLeafSyncer(client, tasks, &syncclient.LeafSyncerConfig{ RequestSize: cfg.requestSize, NumWorkers: cfg.numWorkers, - OnFailure: func() {}, // No-op since we flush progress to disk at the regular commit interval. }) return syncer, nil @@ -146,6 +146,16 @@ func (s *Syncer) Sync(ctx context.Context) error { return s.syncer.Sync(ctx) } +// Finalize commits any pending database changes to disk. +// This ensures that even if the sync is cancelled or fails, we preserve +// the progress up to the last fully synced height. +func (s *Syncer) Finalize() error { + if s.db == nil { + return nil + } + return s.db.Commit() +} + // addZeroes returns the big-endian representation of `height`, prefixed with [common.HashLength] zeroes. func addZeroes(height uint64) []byte { // Key format is [height(8 bytes)][blockchainID(32 bytes)]. Start should be the diff --git a/plugin/evm/vmsync/registry.go b/plugin/evm/vmsync/registry.go index 7c38ae2d19..c7c211af38 100644 --- a/plugin/evm/vmsync/registry.go +++ b/plugin/evm/vmsync/registry.go @@ -53,6 +53,10 @@ func (r *SyncerRegistry) Register(syncer syncpkg.Syncer) error { // RunSyncerTasks executes all registered syncers synchronously. func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syncable) error { + // Ensure finalization runs regardless of how this function exits. + // This guarantees cleanup even on early returns or panics. + defer r.FinalizeAll(summary) + // Early return if context is already canceled (e.g., during shutdown). if err := ctx.Err(); err != nil { return err @@ -102,3 +106,15 @@ func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncabl return g } + +// FinalizeAll iterates over all registered syncers and calls Finalize on those that implement the Finalizer interface. +// Errors are logged but not returned to ensure best-effort cleanup of all syncers. +func (r *SyncerRegistry) FinalizeAll(summary message.Syncable) { + for _, task := range r.syncers { + if f, ok := task.syncer.(syncpkg.Finalizer); ok { + if err := f.Finalize(); err != nil { + log.Error("failed to finalize syncer", "syncer", task.name, "err", err, "summary", summary.GetBlockHash().Hex(), "height", summary.Height()) + } + } + } +} diff --git a/sync/client/leaf_syncer.go b/sync/client/leaf_syncer.go index 8f7a1b8626..b95708781c 100644 --- a/sync/client/leaf_syncer.go +++ b/sync/client/leaf_syncer.go @@ -37,7 +37,6 @@ type LeafSyncTask interface { type LeafSyncerConfig struct { RequestSize uint16 // Number of leafs to request from a peer at a time NumWorkers int // Number of workers to process leaf sync tasks - OnFailure func() // Callback for handling errors during sync } type CallbackLeafSyncer struct { @@ -159,9 +158,9 @@ func (c *CallbackLeafSyncer) Sync(ctx context.Context) error { }) } - err := eg.Wait() - if err != nil { - c.config.OnFailure() + if err := eg.Wait(); err != nil { + return err } - return err + + return nil } diff --git a/sync/statesync/code_queue.go b/sync/statesync/code_queue.go index 7c8c4c7197..1845267ec7 100644 --- a/sync/statesync/code_queue.go +++ b/sync/statesync/code_queue.go @@ -15,11 +15,15 @@ import ( "github.com/ava-labs/libevm/libevm/options" "github.com/ava-labs/coreth/plugin/evm/customrawdb" + + syncpkg "github.com/ava-labs/coreth/sync" ) const defaultQueueCapacity = 5000 var ( + _ syncpkg.Finalizer = (*CodeQueue)(nil) + errFailedToAddCodeHashesToQueue = errors.New("failed to add code hashes to queue") errFailedToFinalizeCodeQueue = errors.New("failed to finalize code queue") ) diff --git a/sync/statesync/state_syncer.go b/sync/statesync/state_syncer.go index 9976aceacf..c4e98ec2e6 100644 --- a/sync/statesync/state_syncer.go +++ b/sync/statesync/state_syncer.go @@ -106,7 +106,6 @@ func NewSyncer(client syncclient.Client, db ethdb.Database, root common.Hash, co ss.syncer = syncclient.NewCallbackLeafSyncer(client, ss.segments, &syncclient.LeafSyncerConfig{ RequestSize: leafsRequestSize, NumWorkers: defaultNumWorkers, - OnFailure: ss.onSyncFailure, }) if codeQueue == nil { @@ -301,19 +300,19 @@ func (t *stateSync) removeTrieInProgress(root common.Hash) (int, error) { return len(t.triesInProgress), nil } -// onSyncFailure is called if the sync fails, this writes all -// batches of in-progress trie segments to disk to have maximum -// progress to restore. -func (t *stateSync) onSyncFailure() { +// Finalize checks if there are any in-progress tries and flushes their batches to disk +// to preserve progress. This is called by the syncer registry on sync failure or cancellation. +func (t *stateSync) Finalize() error { t.lock.RLock() defer t.lock.RUnlock() for _, trie := range t.triesInProgress { for _, segment := range trie.segments { if err := segment.batch.Write(); err != nil { - log.Error("failed to write segment batch on sync failure", "err", err) - return + log.Error("failed to write segment batch on finalize", "err", err) + return err } } } + return nil } diff --git a/sync/types.go b/sync/types.go index d6e9005b06..93a3da652e 100644 --- a/sync/types.go +++ b/sync/types.go @@ -31,6 +31,13 @@ type Syncer interface { ID() string } +// Finalizer provides a mechanism to perform cleanup operations after a sync operation. +// This is useful for handling inflight requests, flushing to disk, or other cleanup tasks. +type Finalizer interface { + // Finalize performs any necessary cleanup operations. + Finalize() error +} + // SummaryProvider is an interface for providing state summaries. type SummaryProvider interface { StateSummaryAtBlock(ethBlock *types.Block) (block.StateSummary, error)