Skip to content
This repository was archived by the owner on Nov 25, 2025. It is now read-only.

Commit a977649

Browse files
committed
feat(statesync): introduce Finalizer interface for syncer cleanup
Add a `Finalizer` interface to provide explicit cleanup operations for syncers. This ensures cleanup (like flushing batches to disk) is performed reliably even on cancellation or early returns. - Add `Finalizer` interface to `sync/types.go` for explicit cleanup. - Attach `Finalize()` in `CodeQueue` that finalizes code fetching to this new interface. - Gather finalization logic in a `Finalize()` for StateSyncer to flush in-progress trie batches. - Implement `Finalize()` for AtomicSyncer to commit pending database changes. - Add `FinalizeAll()` to SyncerRegistry with defer to ensure cleanup runs. - Remove `OnFailure` callback mechanism (replaced by `Finalizer`). resolves #1089 Signed-off-by: Tsvetan Dimitrov ([email protected])
1 parent b0565c7 commit a977649

File tree

8 files changed

+54
-24
lines changed

8 files changed

+54
-24
lines changed

plugin/evm/atomic/sync/syncer.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const (
3333
var (
3434
_ sync.Syncer = (*Syncer)(nil)
3535
_ syncclient.LeafSyncTask = (*syncerLeafTask)(nil)
36+
_ sync.Finalizer = (*Syncer)(nil)
3637

3738
errTargetHeightRequired = errors.New("target height must be > 0")
3839
)
@@ -125,7 +126,6 @@ func NewSyncer(client syncclient.LeafClient, db *versiondb.Database, atomicTrie
125126
syncer.syncer = syncclient.NewCallbackLeafSyncer(client, tasks, &syncclient.LeafSyncerConfig{
126127
RequestSize: cfg.requestSize,
127128
NumWorkers: cfg.numWorkers,
128-
OnFailure: func() {}, // No-op since we flush progress to disk at the regular commit interval.
129129
})
130130

131131
return syncer, nil
@@ -146,6 +146,16 @@ func (s *Syncer) Sync(ctx context.Context) error {
146146
return s.syncer.Sync(ctx)
147147
}
148148

149+
// Finalize commits any pending database changes to disk.
150+
// This ensures that even if the sync is cancelled or fails, we preserve
151+
// the progress up to the last fully synced height.
152+
func (s *Syncer) Finalize() error {
153+
if s.db == nil {
154+
return nil
155+
}
156+
return s.db.Commit()
157+
}
158+
149159
// addZeroes returns the big-endian representation of `height`, prefixed with [common.HashLength] zeroes.
150160
func addZeroes(height uint64) []byte {
151161
// Key format is [height(8 bytes)][blockchainID(32 bytes)]. Start should be the

plugin/evm/vmsync/registry.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (r *SyncerRegistry) Register(syncer syncpkg.Syncer) error {
5353

5454
// RunSyncerTasks executes all registered syncers synchronously.
5555
func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syncable) error {
56+
// Ensure finalization runs regardless of how this function exits.
57+
// This guarantees cleanup even on early returns or panics.
58+
defer r.FinalizeAll(summary)
59+
5660
// Early return if context is already canceled (e.g., during shutdown).
5761
if err := ctx.Err(); err != nil {
5862
return err
@@ -102,3 +106,15 @@ func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncabl
102106

103107
return g
104108
}
109+
110+
// FinalizeAll iterates over all registered syncers and calls Finalize on those that implement the Finalizer interface.
111+
// Errors are logged but not returned to ensure best-effort cleanup of all syncers.
112+
func (r *SyncerRegistry) FinalizeAll(summary message.Syncable) {
113+
for _, task := range r.syncers {
114+
if f, ok := task.syncer.(syncpkg.Finalizer); ok {
115+
if err := f.Finalize(); err != nil {
116+
log.Error("failed to finalize syncer", "syncer", task.name, "err", err, "summary", summary.GetBlockHash().Hex(), "height", summary.Height())
117+
}
118+
}
119+
}
120+
}

sync/blocksync/syncer.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,6 @@ func (s *BlockSyncer) Sync(ctx context.Context) error {
7878
// first, check for blocks already available on disk so we don't
7979
// request them from peers.
8080
for blocksToFetch > 0 {
81-
// Check for context cancellation before checking each block.
82-
if err := ctx.Err(); err != nil {
83-
return err
84-
}
85-
8681
blk := rawdb.ReadBlock(s.db, nextHash, nextHeight)
8782
if blk == nil {
8883
// block was not found
@@ -99,11 +94,6 @@ func (s *BlockSyncer) Sync(ctx context.Context) error {
9994
// them to disk.
10095
batch := s.db.NewBatch()
10196
for fetched := uint64(0); fetched < blocksToFetch && (nextHash != common.Hash{}); {
102-
// Check for context cancellation before making network requests.
103-
if err := ctx.Err(); err != nil {
104-
return err
105-
}
106-
10797
log.Info("fetching blocks from peer", "fetched", fetched, "total", blocksToFetch)
10898
blocks, err := s.client.GetBlocks(ctx, nextHash, nextHeight, blocksPerRequest)
10999
if err != nil {

sync/client/leaf_syncer.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ type LeafSyncTask interface {
3737
type LeafSyncerConfig struct {
3838
RequestSize uint16 // Number of leafs to request from a peer at a time
3939
NumWorkers int // Number of workers to process leaf sync tasks
40-
OnFailure func() // Callback for handling errors during sync
4140
}
4241

4342
type CallbackLeafSyncer struct {
@@ -159,9 +158,9 @@ func (c *CallbackLeafSyncer) Sync(ctx context.Context) error {
159158
})
160159
}
161160

162-
err := eg.Wait()
163-
if err != nil {
164-
c.config.OnFailure()
161+
if err := eg.Wait(); err != nil {
162+
return err
165163
}
166-
return err
164+
165+
return nil
167166
}

sync/statesync/code_queue.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@ import (
1515
"github.com/ava-labs/libevm/libevm/options"
1616

1717
"github.com/ava-labs/coreth/plugin/evm/customrawdb"
18+
syncpkg "github.com/ava-labs/coreth/sync"
1819
)
1920

2021
const defaultQueueCapacity = 5000
2122

2223
var (
24+
_ syncpkg.Finalizer = (*CodeQueue)(nil)
25+
2326
errFailedToAddCodeHashesToQueue = errors.New("failed to add code hashes to queue")
2427
errFailedToFinalizeCodeQueue = errors.New("failed to finalize code queue")
2528
)
@@ -175,6 +178,8 @@ func (q *CodeQueue) init() error {
175178
if err != nil {
176179
return fmt.Errorf("unable to recover previous sync state: %w", err)
177180
}
181+
// Use context.Background() since init() runs during construction before sync starts.
182+
// The channel is empty, so sends won't block. Shutdown is handled via q.quit in AddCode.
178183
if err := q.AddCode(context.Background(), dbCodeHashes); err != nil {
179184
return fmt.Errorf("unable to resume previous sync: %w", err)
180185
}

sync/statesync/state_syncer.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ func NewSyncer(client syncclient.Client, db ethdb.Database, root common.Hash, co
106106
ss.syncer = syncclient.NewCallbackLeafSyncer(client, ss.segments, &syncclient.LeafSyncerConfig{
107107
RequestSize: leafsRequestSize,
108108
NumWorkers: defaultNumWorkers,
109-
OnFailure: ss.onSyncFailure,
110109
})
111110

112111
if codeQueue == nil {
@@ -301,19 +300,19 @@ func (t *stateSync) removeTrieInProgress(root common.Hash) (int, error) {
301300
return len(t.triesInProgress), nil
302301
}
303302

304-
// onSyncFailure is called if the sync fails, this writes all
305-
// batches of in-progress trie segments to disk to have maximum
306-
// progress to restore.
307-
func (t *stateSync) onSyncFailure() {
303+
// Finalize checks if there are any in-progress tries and flushes their batches to disk
304+
// to preserve progress. This is called by the syncer registry on sync failure or cancellation.
305+
func (t *stateSync) Finalize() error {
308306
t.lock.RLock()
309307
defer t.lock.RUnlock()
310308

311309
for _, trie := range t.triesInProgress {
312310
for _, segment := range trie.segments {
313311
if err := segment.batch.Write(); err != nil {
314-
log.Error("failed to write segment batch on sync failure", "err", err)
315-
return
312+
log.Error("failed to write segment batch on finalize", "err", err)
313+
return err
316314
}
317315
}
318316
}
317+
return nil
319318
}

sync/statesync/trie_sync_tasks.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,13 @@ func (s *storageTrieTask) OnFinish() error {
143143
return s.sync.onStorageTrieFinished(s.root)
144144
}
145145

146-
func (s *storageTrieTask) OnLeafs(_ context.Context, db ethdb.KeyValueWriter, keys, vals [][]byte) error {
146+
func (s *storageTrieTask) OnLeafs(ctx context.Context, db ethdb.KeyValueWriter, keys, vals [][]byte) error {
147147
// persists the trie leafs to the snapshot for all accounts associated with this root
148148
for _, account := range s.accounts {
149+
// Check context cancellation before processing each account to allow early exit during shutdown.
150+
if err := ctx.Err(); err != nil {
151+
return err
152+
}
149153
for i, key := range keys {
150154
rawdb.WriteStorageSnapshot(db, account, common.BytesToHash(key), vals[i])
151155
}

sync/types.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ type Syncer interface {
3131
ID() string
3232
}
3333

34+
// Finalizer provides a mechanism to perform cleanup operations after a sync operation.
35+
// This is useful for handling inflight requests, flushing to disk, or other cleanup tasks.
36+
type Finalizer interface {
37+
// Finalize performs any necessary cleanup operations.
38+
Finalize() error
39+
}
40+
3441
// SummaryProvider is an interface for providing state summaries.
3542
type SummaryProvider interface {
3643
StateSummaryAtBlock(ethBlock *types.Block) (block.StateSummary, error)

0 commit comments

Comments
 (0)