From 6700e625199cb4335b6302ed1ff8f0547f677daf Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Mon, 17 Nov 2025 15:13:54 +0200 Subject: [PATCH 01/14] fix(vmsync): log context cancellation as INFO instead of ERROR during shutdown During graceful shutdown, syncers cancelled via context cancellation were being logged as ERROR level. This is misleading since cancellation during shutdown is expected behavior, not an error condition. - Use `errors.Is()` to detect `context.Canceled` and `context.DeadlineExceeded` (handles wrapped errors) and log as INFO instead of ERROR - Separate `RunSyncerTasks()` logic into a synchronous wrapper and `StartAsync()` method for async execution to gain more flexibility and handle more use cases. - Add early return optimization when context is already cancelled. Test improvements: - Add tests for cancellation scenarios (`Canceled`, `DeadlineExceeded`, wrapped errors, early return). - Fix flakiness by adding WaitGroup synchronization and replacing channel-based coordination. - Refactor tests to use `t.Context()` and extract common helpers. resolves #1410 --- plugin/evm/vmsync/doubles_test.go | 16 +- plugin/evm/vmsync/registry.go | 48 +++-- plugin/evm/vmsync/registry_test.go | 282 +++++++++++++++++++++++++---- 3 files changed, 290 insertions(+), 56 deletions(-) diff --git a/plugin/evm/vmsync/doubles_test.go b/plugin/evm/vmsync/doubles_test.go index e0c6d3e030..f32c950aac 100644 --- a/plugin/evm/vmsync/doubles_test.go +++ b/plugin/evm/vmsync/doubles_test.go @@ -47,8 +47,12 @@ func NewBarrierSyncer(wg *sync.WaitGroup, releaseCh <-chan struct{}) FuncSyncer // NewErrorSyncer returns a syncer that waits until either `trigger` is closed // (then returns `errToReturn`) or `ctx` is canceled (then returns ctx.Err). -func NewErrorSyncer(trigger <-chan struct{}, errToReturn error) FuncSyncer { +// If `startedWG` is provided, it calls Done() when Sync begins. +func NewErrorSyncer(trigger <-chan struct{}, errToReturn error, startedWG *sync.WaitGroup) FuncSyncer { return FuncSyncer{fn: func(ctx context.Context) error { + if startedWG != nil { + startedWG.Done() + } select { case <-trigger: return errToReturn @@ -58,18 +62,18 @@ func NewErrorSyncer(trigger <-chan struct{}, errToReturn error) FuncSyncer { }} } -// NewCancelAwareSyncer closes `started` as soon as Sync begins, then waits for +// NewCancelAwareSyncer calls startedWG.Done() as soon as Sync begins, then waits for // either: -// - `ctx` cancellation: closes `canceled` and returns ctx.Err; or +// - `ctx` cancellation: calls canceledWG.Done() and returns ctx.Err; or // - `timeout` elapsing: returns an error indicating a timeout. // // Useful for asserting that cancellation propagates to the syncer under test. -func NewCancelAwareSyncer(started, canceled chan struct{}, timeout time.Duration) FuncSyncer { +func NewCancelAwareSyncer(startedWG, canceledWG *sync.WaitGroup, timeout time.Duration) FuncSyncer { return FuncSyncer{fn: func(ctx context.Context) error { - close(started) + startedWG.Done() select { case <-ctx.Done(): - close(canceled) + canceledWG.Done() return ctx.Err() case <-time.After(timeout): return errors.New("syncer timed out waiting for cancellation") diff --git a/plugin/evm/vmsync/registry.go b/plugin/evm/vmsync/registry.go index f6895807e3..493c648977 100644 --- a/plugin/evm/vmsync/registry.go +++ b/plugin/evm/vmsync/registry.go @@ -13,7 +13,7 @@ import ( "github.com/ava-labs/coreth/plugin/evm/message" - synccommon "github.com/ava-labs/coreth/sync" + syncpkg "github.com/ava-labs/coreth/sync" ) var errSyncerAlreadyRegistered = errors.New("syncer already registered") @@ -21,7 +21,7 @@ var errSyncerAlreadyRegistered = errors.New("syncer already registered") // SyncerTask represents a single syncer with its name for identification. type SyncerTask struct { name string - syncer synccommon.Syncer + syncer syncpkg.Syncer } // SyncerRegistry manages a collection of syncers for sequential execution. @@ -39,7 +39,7 @@ func NewSyncerRegistry() *SyncerRegistry { // Register adds a syncer to the registry. // Returns an error if a syncer with the same name is already registered. -func (r *SyncerRegistry) Register(syncer synccommon.Syncer) error { +func (r *SyncerRegistry) Register(syncer syncpkg.Syncer) error { id := syncer.ID() if r.registeredNames[id] { return fmt.Errorf("%w with id '%s'", errSyncerAlreadyRegistered, id) @@ -51,22 +51,46 @@ func (r *SyncerRegistry) Register(syncer synccommon.Syncer) error { return nil } -// RunSyncerTasks executes all registered syncers. -// The provided summary is used only for logging to decouple from concrete client types. +// RunSyncerTasks executes all registered syncers synchronously. func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syncable) error { + // Early return if context is already canceled (e.g., during shutdown). + if err := ctx.Err(); err != nil { + return err + } + + g := r.StartAsync(ctx, summary) + + if err := g.Wait(); err != nil { + return err + } + + log.Info("all syncers completed successfully", "count", len(r.syncers), "summary", summary.GetBlockHash().Hex()) + + return nil +} + +// StartAsync launches all registered syncers and returns an [errgroup.Group] +// whose Wait() completes when all syncers exit. The context returned will be +// cancelled when any syncer fails, propagating shutdown to the others. +func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncable) *errgroup.Group { + g, ctx := errgroup.WithContext(ctx) + if len(r.syncers) == 0 { - return nil + return g } summaryBlockHashHex := summary.GetBlockHash().Hex() blockHeight := summary.Height() - g, ctx := errgroup.WithContext(ctx) - for _, task := range r.syncers { g.Go(func() error { log.Info("starting syncer", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight) if err := task.syncer.Sync(ctx); err != nil { + // Context cancellation during shutdown is expected - log as INFO, not ERROR. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + log.Info("syncer cancelled", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight) + return err + } log.Error("failed syncing", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight, "err", err) return fmt.Errorf("%s failed: %w", task.name, err) } @@ -76,11 +100,5 @@ func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syn }) } - if err := g.Wait(); err != nil { - return err - } - - log.Info("all syncers completed successfully", "count", len(r.syncers), "summary", summaryBlockHashHex) - - return nil + return g } diff --git a/plugin/evm/vmsync/registry_test.go b/plugin/evm/vmsync/registry_test.go index 2a85661444..c1b56a81db 100644 --- a/plugin/evm/vmsync/registry_test.go +++ b/plugin/evm/vmsync/registry_test.go @@ -149,7 +149,7 @@ func TestSyncerRegistry_RunSyncerTasks(t *testing.T) { } }, }, { - name: "error returned", + name: "error returned and wrapped", syncers: []syncerConfig{ {"Syncer1", errFoo}, {"Syncer2", nil}, @@ -175,13 +175,19 @@ func TestSyncerRegistry_RunSyncerTasks(t *testing.T) { require.NoError(t, registry.Register(mockSyncer)) } - ctx, cancel := utilstest.NewTestContext(t) + ctx, cancel := context.WithCancel(t.Context()) t.Cleanup(cancel) err := registry.RunSyncerTasks(ctx, newTestClientSummary(t)) require.ErrorIs(t, err, tt.expectedError) + // Verify error wrapping for real errors (not cancellation). + if tt.expectedError != nil { + require.NotEqual(t, tt.expectedError, err, "error should be wrapped") + require.Contains(t, err.Error(), "Syncer1 failed", "error message should include syncer name") + } + // Use custom assertion function for each test case. tt.assertState(t, mockSyncers) }) @@ -193,7 +199,7 @@ func TestSyncerRegistry_ConcurrentStart(t *testing.T) { registry := NewSyncerRegistry() - ctx, cancel := utilstest.NewTestContext(t) + ctx, cancel := context.WithCancel(t.Context()) t.Cleanup(cancel) const numBarrierSyncers = 5 @@ -209,8 +215,7 @@ func TestSyncerRegistry_ConcurrentStart(t *testing.T) { require.NoError(t, registry.Register(s)) } - doneCh := make(chan error, 1) - go func() { doneCh <- registry.RunSyncerTasks(ctx, newTestClientSummary(t)) }() + doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) utilstest.WaitGroupWithTimeout(t, &allStartedWG, 2*time.Second, "timed out waiting for barrier syncers to start") close(releaseCh) @@ -223,44 +228,32 @@ func TestSyncerRegistry_ErrorPropagatesAndCancelsOthers(t *testing.T) { registry := NewSyncerRegistry() - ctx, cancel := utilstest.NewTestContext(t) + ctx, cancel := context.WithCancel(t.Context()) t.Cleanup(cancel) // Error syncer trigger := make(chan struct{}) errFirst := errors.New("test error") - require.NoError(t, registry.Register(&namedSyncer{name: "ErrorSyncer-0", syncer: NewErrorSyncer(trigger, errFirst)})) + var errorSyncerStartedWG sync.WaitGroup + errorSyncerStartedWG.Add(1) + require.NoError(t, registry.Register(&namedSyncer{name: "ErrorSyncer-0", syncer: NewErrorSyncer(trigger, errFirst, &errorSyncerStartedWG)})) // Cancel-aware syncers to verify cancellation propagation const numCancelSyncers = 2 - var cancelChans []chan struct{} - var startedChans []chan struct{} - - for i := 0; i < numCancelSyncers; i++ { - startedCh := make(chan struct{}) - canceledCh := make(chan struct{}) - cancelChans = append(cancelChans, canceledCh) - startedChans = append(startedChans, startedCh) - name := fmt.Sprintf("CancelSyncer-%d", i) - require.NoError(t, registry.Register(&namedSyncer{name: name, syncer: NewCancelAwareSyncer(startedCh, canceledCh, 4*time.Second)})) - } + startedWG, canceledWG := registerCancelAwareSyncers(t, registry, numCancelSyncers, 4*time.Second) - doneCh := make(chan error, 1) - go func() { doneCh <- registry.RunSyncerTasks(ctx, newTestClientSummary(t)) }() + doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) - // Ensure cancel-aware syncers are running before triggering the error - for i, started := range startedChans { - utilstest.WaitSignalWithTimeout(t, started, 2*time.Second, fmt.Sprintf("cancel-aware syncer %d did not start", i)) - } + // Ensure all syncers (error syncer and cancel-aware syncers) are running before triggering the error + utilstest.WaitGroupWithTimeout(t, &errorSyncerStartedWG, 2*time.Second, "timed out waiting for error syncer to start") + utilstest.WaitGroupWithTimeout(t, startedWG, 2*time.Second, "timed out waiting for cancel-aware syncers to start") close(trigger) err := utilstest.WaitErrWithTimeout(t, doneCh, 4*time.Second) require.ErrorIs(t, err, errFirst) - for i, cancelCh := range cancelChans { - utilstest.WaitSignalWithTimeout(t, cancelCh, 2*time.Second, fmt.Sprintf("cancellation was not propagated to cancel syncer %d", i)) - } + utilstest.WaitGroupWithTimeout(t, canceledWG, 2*time.Second, "cancellation was not propagated to all syncers") } func TestSyncerRegistry_FirstErrorWinsAcrossMany(t *testing.T) { @@ -268,13 +261,17 @@ func TestSyncerRegistry_FirstErrorWinsAcrossMany(t *testing.T) { registry := NewSyncerRegistry() - ctx, cancel := utilstest.NewTestContext(t) + ctx, cancel := context.WithCancel(t.Context()) t.Cleanup(cancel) const numErrorSyncers = 3 - var triggers []chan struct{} - var errFirst error + var ( + triggers []chan struct{} + errFirst error + allErrorSyncersStartedWG sync.WaitGroup + ) + allErrorSyncersStartedWG.Add(numErrorSyncers) for i := 0; i < numErrorSyncers; i++ { trigger := make(chan struct{}) @@ -284,13 +281,15 @@ func TestSyncerRegistry_FirstErrorWinsAcrossMany(t *testing.T) { errFirst = errInstance } name := fmt.Sprintf("ErrorSyncer-%d", i) - require.NoError(t, registry.Register(&namedSyncer{name: name, syncer: NewErrorSyncer(trigger, errInstance)})) + require.NoError(t, registry.Register(&namedSyncer{name: name, syncer: NewErrorSyncer(trigger, errInstance, &allErrorSyncersStartedWG)})) } - doneCh := make(chan error, 1) - go func() { doneCh <- registry.RunSyncerTasks(ctx, newTestClientSummary(t)) }() + doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) + + // Wait for all error syncers to start before triggering the error. + utilstest.WaitGroupWithTimeout(t, &allErrorSyncersStartedWG, 2*time.Second, "timed out waiting for error syncers to start") - // Trigger only the first error; others should return due to cancellation + // Trigger only the first error - others should return due to cancellation close(triggers[0]) err := utilstest.WaitErrWithTimeout(t, doneCh, 4*time.Second) @@ -301,12 +300,225 @@ func TestSyncerRegistry_NoSyncersRegistered(t *testing.T) { t.Parallel() registry := NewSyncerRegistry() - ctx, cancel := utilstest.NewTestContext(t) + ctx, cancel := context.WithCancel(t.Context()) t.Cleanup(cancel) require.NoError(t, registry.RunSyncerTasks(ctx, newTestClientSummary(t))) } +func TestSyncerRegistry_ContextCancellationErrors(t *testing.T) { + t.Parallel() + + // Test scenario that reproduces the issue where context cancellation during shutdown + // was being logged as ERROR instead of INFO. The fix ensures that: + // 1. Context cancellation errors are returned as-is (not wrapped). + // 2. Each syncer cancellation is logged as INFO, not ERROR. + // 3. The error propagates correctly to the caller. + + tests := []struct { + name string + wantErr error + numSyncers int + syncerTimeout time.Duration + timeout time.Duration // Timeout duration (only used when wantErr is DeadlineExceeded) + }{ + { + name: "context canceled during active sync", + wantErr: context.Canceled, + numSyncers: 3, + syncerTimeout: 5 * time.Second, + }, + { + name: "context deadline exceeded", + wantErr: context.DeadlineExceeded, + numSyncers: 1, + syncerTimeout: 500 * time.Millisecond, + timeout: 200 * time.Millisecond, // Increased to allow syncers to start before deadline expires + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + registry := NewSyncerRegistry() + + startedWG, canceledWG := registerCancelAwareSyncers(t, registry, tt.numSyncers, tt.syncerTimeout) + + var ( + ctx context.Context + cancel context.CancelFunc + ) + if tt.wantErr == context.DeadlineExceeded { + ctx, cancel = context.WithTimeout(t.Context(), tt.timeout) + } else { + ctx, cancel = context.WithCancel(t.Context()) + } + defer cancel() + + doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) + + // Wait for syncers to start. + waitTimeout := 2 * time.Second + if tt.wantErr == context.DeadlineExceeded { + // For deadline exceeded, use shorter timeout since deadline is tight. + waitTimeout = 100 * time.Millisecond + } + utilstest.WaitGroupWithTimeout(t, startedWG, waitTimeout, "timed out waiting for syncers to start") + + // Trigger cancellation for Canceled test (DeadlineExceeded will expire naturally). + if tt.wantErr == context.Canceled { + cancel() + } + + err := utilstest.WaitErrWithTimeout(t, doneCh, 4*time.Second) + + // Verify error is returned directly (not wrapped). + require.ErrorIs(t, err, tt.wantErr) + require.Equal(t, tt.wantErr, err, "error should be returned directly, not wrapped") + + // Verify that all syncers detected cancellation. + utilstest.WaitGroupWithTimeout(t, canceledWG, 2*time.Second, "cancellation was not propagated to all syncers") + }) + } +} + +func TestSyncerRegistry_EarlyReturnOnAlreadyCancelledContext(t *testing.T) { + t.Parallel() + + // Test scenario where the early return optimization is triggered when context is already cancelled + // before starting syncers (e.g., during shutdown). + + registry := NewSyncerRegistry() + + // Create and immediately cancel context. + ctx, cancel := context.WithCancel(t.Context()) + cancel() // Cancel before starting syncers to test early return. + + // Register syncers (they should never start). + const numSyncers = 2 + mockSyncers := make([]*mockSyncer, numSyncers) + for i := 0; i < numSyncers; i++ { + name := fmt.Sprintf("Syncer-%d", i) + mockSyncer := newMockSyncer(name, nil) + mockSyncers[i] = mockSyncer + require.NoError(t, registry.Register(mockSyncer)) + } + + // RunSyncerTasks should return immediately with context.Canceled. + err := registry.RunSyncerTasks(ctx, newTestClientSummary(t)) + + // Verify that context.Canceled is returned immediately. + require.ErrorIs(t, err, context.Canceled) + require.Equal(t, context.Canceled, err, "error should be context.Canceled directly, not wrapped") + + // Verify that syncers were never started (early return optimization). + for i, mockSyncer := range mockSyncers { + require.False(t, mockSyncer.started, "Syncer %d should not have been started due to early return", i) + } +} + +func TestSyncerRegistry_MixedCancellationAndSuccess(t *testing.T) { + t.Parallel() + + // Test scenario where some syncers succeed and some get cancelled. + // This verifies that cancellation properly propagates to all syncers. + + registry := NewSyncerRegistry() + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + // Create a syncer that will succeed quickly. + releaseCh := make(chan struct{}) + var successWG sync.WaitGroup + successWG.Add(1) + require.NoError(t, registry.Register(&namedSyncer{ + name: "SuccessSyncer", + syncer: NewBarrierSyncer(&successWG, releaseCh), + })) + + // Create syncers that will be cancelled. + const numCancelSyncers = 2 + startedWG, canceledWG := registerCancelAwareSyncers(t, registry, numCancelSyncers, 5*time.Second) + + doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) + + utilstest.WaitGroupWithTimeout(t, &successWG, 2*time.Second, "success syncer did not start") + utilstest.WaitGroupWithTimeout(t, startedWG, 2*time.Second, "timed out waiting for syncers to start") + + // Cancel context, this should cancel all syncers, even the one that could succeed. + cancel() + + err := utilstest.WaitErrWithTimeout(t, doneCh, 4*time.Second) + + // Verify that the cancellation error is returned. + require.ErrorIs(t, err, context.Canceled) + + // Verify that all cancel-aware syncers detected cancellation. + utilstest.WaitGroupWithTimeout(t, canceledWG, 2*time.Second, "cancellation was not propagated to all syncers") +} + +func TestSyncerRegistry_WrappedContextCanceledError(t *testing.T) { + t.Parallel() + + // Test scenario that verifies that wrapped context.Canceled errors + // are correctly detected and logged as INFO, not ERROR. + + registry := NewSyncerRegistry() + + // Create a syncer that returns a wrapped context.Canceled error + // (simulating the real-world scenario from the original error) + wrappedErrorSyncer := FuncSyncer{ + fn: func(_ context.Context) error { + // Simulate the wrapped error pattern of for example the code syncer not being able to fetch a codehash. + return fmt.Errorf("could not get code (CodeRequest(...)): %w", context.Canceled) + }, + } + + require.NoError(t, registry.Register(&namedSyncer{ + name: "WrappedErrorSyncer", + syncer: wrappedErrorSyncer, + })) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + // Start syncers - the wrapped error should be detected by errors.Is(). + g := registry.StartAsync(ctx, newTestClientSummary(t)) + + err := g.Wait() + + // Verify that errors.Is correctly identifies the wrapped context.Canceled + require.ErrorIs(t, err, context.Canceled, "wrapped context.Canceled should be detected by errors.Is") +} + +// registerCancelAwareSyncers registers [numSyncers] cancel-aware syncers with the registry +// and returns WaitGroups for coordination. +func registerCancelAwareSyncers(t *testing.T, registry *SyncerRegistry, numSyncers int, timeout time.Duration) (*sync.WaitGroup, *sync.WaitGroup) { + t.Helper() + var startedWG, canceledWG sync.WaitGroup + startedWG.Add(numSyncers) + canceledWG.Add(numSyncers) + for i := 0; i < numSyncers; i++ { + require.NoError(t, registry.Register(&namedSyncer{ + name: fmt.Sprintf("Syncer-%d", i), + syncer: NewCancelAwareSyncer(&startedWG, &canceledWG, timeout), + })) + } + return &startedWG, &canceledWG +} + +// startSyncersAsync starts the syncers asynchronously using StartAsync and returns a channel to receive the error. +func startSyncersAsync(registry *SyncerRegistry, ctx context.Context, summary message.Syncable) <-chan error { + doneCh := make(chan error, 1) + g := registry.StartAsync(ctx, summary) + go func() { + doneCh <- g.Wait() + }() + return doneCh +} + func newTestClientSummary(t *testing.T) message.Syncable { t.Helper() summary, err := message.NewBlockSyncSummary(common.HexToHash("0xdeadbeef"), 1000, common.HexToHash("0xdeadbeef")) From 3b38cf07089e9e88af27f4b436ee89eeda4800f9 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Mon, 17 Nov 2025 17:31:16 +0200 Subject: [PATCH 02/14] fix(vmsync): improve on remarks --- plugin/evm/vmsync/registry.go | 2 +- plugin/evm/vmsync/registry_test.go | 12 +++--------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/plugin/evm/vmsync/registry.go b/plugin/evm/vmsync/registry.go index 493c648977..0aab6c6f61 100644 --- a/plugin/evm/vmsync/registry.go +++ b/plugin/evm/vmsync/registry.go @@ -86,7 +86,7 @@ func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncabl g.Go(func() error { log.Info("starting syncer", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight) if err := task.syncer.Sync(ctx); err != nil { - // Context cancellation during shutdown is expected - log as INFO, not ERROR. + // Context cancellation during shutdown is expected. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { log.Info("syncer cancelled", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight) return err diff --git a/plugin/evm/vmsync/registry_test.go b/plugin/evm/vmsync/registry_test.go index c1b56a81db..9831fd4e73 100644 --- a/plugin/evm/vmsync/registry_test.go +++ b/plugin/evm/vmsync/registry_test.go @@ -182,12 +182,6 @@ func TestSyncerRegistry_RunSyncerTasks(t *testing.T) { require.ErrorIs(t, err, tt.expectedError) - // Verify error wrapping for real errors (not cancellation). - if tt.expectedError != nil { - require.NotEqual(t, tt.expectedError, err, "error should be wrapped") - require.Contains(t, err.Error(), "Syncer1 failed", "error message should include syncer name") - } - // Use custom assertion function for each test case. tt.assertState(t, mockSyncers) }) @@ -354,7 +348,7 @@ func TestSyncerRegistry_ContextCancellationErrors(t *testing.T) { } else { ctx, cancel = context.WithCancel(t.Context()) } - defer cancel() + t.Cleanup(cancel) doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) @@ -482,14 +476,14 @@ func TestSyncerRegistry_WrappedContextCanceledError(t *testing.T) { })) ctx, cancel := context.WithCancel(t.Context()) - defer cancel() + t.Cleanup(cancel) // Start syncers - the wrapped error should be detected by errors.Is(). g := registry.StartAsync(ctx, newTestClientSummary(t)) err := g.Wait() - // Verify that errors.Is correctly identifies the wrapped context.Canceled + // Verify that errors.Is correctly identifies the wrapped context.Canceled. require.ErrorIs(t, err, context.Canceled, "wrapped context.Canceled should be detected by errors.Is") } From bbe4384ac5969ffb65c437e23b8c7a5d2d3b2ec5 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Mon, 17 Nov 2025 17:48:47 +0200 Subject: [PATCH 03/14] fix: remove checking if error is wrapped --- plugin/evm/vmsync/registry_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/plugin/evm/vmsync/registry_test.go b/plugin/evm/vmsync/registry_test.go index 9831fd4e73..c45e797a7a 100644 --- a/plugin/evm/vmsync/registry_test.go +++ b/plugin/evm/vmsync/registry_test.go @@ -149,7 +149,7 @@ func TestSyncerRegistry_RunSyncerTasks(t *testing.T) { } }, }, { - name: "error returned and wrapped", + name: "error returned", syncers: []syncerConfig{ {"Syncer1", errFoo}, {"Syncer2", nil}, @@ -367,9 +367,8 @@ func TestSyncerRegistry_ContextCancellationErrors(t *testing.T) { err := utilstest.WaitErrWithTimeout(t, doneCh, 4*time.Second) - // Verify error is returned directly (not wrapped). + // Verify error propagates correctly. require.ErrorIs(t, err, tt.wantErr) - require.Equal(t, tt.wantErr, err, "error should be returned directly, not wrapped") // Verify that all syncers detected cancellation. utilstest.WaitGroupWithTimeout(t, canceledWG, 2*time.Second, "cancellation was not propagated to all syncers") @@ -404,7 +403,6 @@ func TestSyncerRegistry_EarlyReturnOnAlreadyCancelledContext(t *testing.T) { // Verify that context.Canceled is returned immediately. require.ErrorIs(t, err, context.Canceled) - require.Equal(t, context.Canceled, err, "error should be context.Canceled directly, not wrapped") // Verify that syncers were never started (early return optimization). for i, mockSyncer := range mockSyncers { From 3ac161f07b42ab0fb1acd6cbd671cf3e5ef7b4e3 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Mon, 17 Nov 2025 18:04:02 +0200 Subject: [PATCH 04/14] style: improvements --- plugin/evm/vmsync/registry_test.go | 42 ++++++++++++------------------ 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/plugin/evm/vmsync/registry_test.go b/plugin/evm/vmsync/registry_test.go index c45e797a7a..d464b9f879 100644 --- a/plugin/evm/vmsync/registry_test.go +++ b/plugin/evm/vmsync/registry_test.go @@ -230,15 +230,16 @@ func TestSyncerRegistry_ErrorPropagatesAndCancelsOthers(t *testing.T) { errFirst := errors.New("test error") var errorSyncerStartedWG sync.WaitGroup errorSyncerStartedWG.Add(1) - require.NoError(t, registry.Register(&namedSyncer{name: "ErrorSyncer-0", syncer: NewErrorSyncer(trigger, errFirst, &errorSyncerStartedWG)})) + errorSyncer := &namedSyncer{name: "ErrorSyncer-0", syncer: NewErrorSyncer(trigger, errFirst, &errorSyncerStartedWG)} + require.NoError(t, registry.Register(errorSyncer)) - // Cancel-aware syncers to verify cancellation propagation + // Cancel-aware syncers to verify cancellation propagation. const numCancelSyncers = 2 startedWG, canceledWG := registerCancelAwareSyncers(t, registry, numCancelSyncers, 4*time.Second) doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) - // Ensure all syncers (error syncer and cancel-aware syncers) are running before triggering the error + // Ensure all syncers (error syncer and cancel-aware syncers) are running before triggering the error. utilstest.WaitGroupWithTimeout(t, &errorSyncerStartedWG, 2*time.Second, "timed out waiting for error syncer to start") utilstest.WaitGroupWithTimeout(t, startedWG, 2*time.Second, "timed out waiting for cancel-aware syncers to start") @@ -283,7 +284,7 @@ func TestSyncerRegistry_FirstErrorWinsAcrossMany(t *testing.T) { // Wait for all error syncers to start before triggering the error. utilstest.WaitGroupWithTimeout(t, &allErrorSyncersStartedWG, 2*time.Second, "timed out waiting for error syncers to start") - // Trigger only the first error - others should return due to cancellation + // Trigger only the first error - others should return due to cancellation. close(triggers[0]) err := utilstest.WaitErrWithTimeout(t, doneCh, 4*time.Second) @@ -327,7 +328,7 @@ func TestSyncerRegistry_ContextCancellationErrors(t *testing.T) { wantErr: context.DeadlineExceeded, numSyncers: 1, syncerTimeout: 500 * time.Millisecond, - timeout: 200 * time.Millisecond, // Increased to allow syncers to start before deadline expires + timeout: 200 * time.Millisecond, }, } @@ -379,9 +380,6 @@ func TestSyncerRegistry_ContextCancellationErrors(t *testing.T) { func TestSyncerRegistry_EarlyReturnOnAlreadyCancelledContext(t *testing.T) { t.Parallel() - // Test scenario where the early return optimization is triggered when context is already cancelled - // before starting syncers (e.g., during shutdown). - registry := NewSyncerRegistry() // Create and immediately cancel context. @@ -413,9 +411,6 @@ func TestSyncerRegistry_EarlyReturnOnAlreadyCancelledContext(t *testing.T) { func TestSyncerRegistry_MixedCancellationAndSuccess(t *testing.T) { t.Parallel() - // Test scenario where some syncers succeed and some get cancelled. - // This verifies that cancellation properly propagates to all syncers. - registry := NewSyncerRegistry() ctx, cancel := context.WithCancel(t.Context()) @@ -454,13 +449,10 @@ func TestSyncerRegistry_MixedCancellationAndSuccess(t *testing.T) { func TestSyncerRegistry_WrappedContextCanceledError(t *testing.T) { t.Parallel() - // Test scenario that verifies that wrapped context.Canceled errors - // are correctly detected and logged as INFO, not ERROR. - registry := NewSyncerRegistry() // Create a syncer that returns a wrapped context.Canceled error - // (simulating the real-world scenario from the original error) + // (simulating the real-world scenario from the original error). wrappedErrorSyncer := FuncSyncer{ fn: func(_ context.Context) error { // Simulate the wrapped error pattern of for example the code syncer not being able to fetch a codehash. @@ -485,6 +477,16 @@ func TestSyncerRegistry_WrappedContextCanceledError(t *testing.T) { require.ErrorIs(t, err, context.Canceled, "wrapped context.Canceled should be detected by errors.Is") } +// startSyncersAsync starts the syncers asynchronously using StartAsync and returns a channel to receive the error. +func startSyncersAsync(registry *SyncerRegistry, ctx context.Context, summary message.Syncable) <-chan error { + doneCh := make(chan error, 1) + g := registry.StartAsync(ctx, summary) + go func() { + doneCh <- g.Wait() + }() + return doneCh +} + // registerCancelAwareSyncers registers [numSyncers] cancel-aware syncers with the registry // and returns WaitGroups for coordination. func registerCancelAwareSyncers(t *testing.T, registry *SyncerRegistry, numSyncers int, timeout time.Duration) (*sync.WaitGroup, *sync.WaitGroup) { @@ -501,16 +503,6 @@ func registerCancelAwareSyncers(t *testing.T, registry *SyncerRegistry, numSynce return &startedWG, &canceledWG } -// startSyncersAsync starts the syncers asynchronously using StartAsync and returns a channel to receive the error. -func startSyncersAsync(registry *SyncerRegistry, ctx context.Context, summary message.Syncable) <-chan error { - doneCh := make(chan error, 1) - g := registry.StartAsync(ctx, summary) - go func() { - doneCh <- g.Wait() - }() - return doneCh -} - func newTestClientSummary(t *testing.T) message.Syncable { t.Helper() summary, err := message.NewBlockSyncSummary(common.HexToHash("0xdeadbeef"), 1000, common.HexToHash("0xdeadbeef")) From 26f9713a5ee7bf455de101ea311f763501ef3694 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Tue, 18 Nov 2025 12:36:30 +0200 Subject: [PATCH 05/14] fix: add context cancellation check to block syncer loop --- sync/blocksync/syncer.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sync/blocksync/syncer.go b/sync/blocksync/syncer.go index ca69a6dabc..8836075a69 100644 --- a/sync/blocksync/syncer.go +++ b/sync/blocksync/syncer.go @@ -94,6 +94,11 @@ func (s *BlockSyncer) Sync(ctx context.Context) error { // them to disk. batch := s.db.NewBatch() for fetched := uint64(0); fetched < blocksToFetch && (nextHash != common.Hash{}); { + // Check for context cancellation before making network requests. + if err := ctx.Err(); err != nil { + return err + } + log.Info("fetching blocks from peer", "fetched", fetched, "total", blocksToFetch) blocks, err := s.client.GetBlocks(ctx, nextHash, nextHeight, blocksPerRequest) if err != nil { From e953c2185839369c7f6bbaacce5ebd3e08e9cc34 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Tue, 18 Nov 2025 13:07:48 +0200 Subject: [PATCH 06/14] fix(statesync): add context cancellation checks to prevent shutdown hang During graceful shutdown, the State Syncer was hanging because multiple blocking operations did not check context cancellation. When shutdown occurred, these operations would block indefinitely, preventing syncers from detecting cancellation and exiting gracefully. - Add context.Context parameter to LeafSyncTask.OnLeafs() interface to enable context propagation through the leaf processing call chain. - Update CodeQueue.AddCode() to accept context and check ctx.Done() before blocking on channel sends, preventing indefinite blocking when Code Syncer stops consuming during shutdown. - Update all OnLeafs implementations (mainTrieTask, storageTrieTask, trieSegment, atomic syncer) to accept and pass context through the call chain. - Add context parameter to startSyncing() and createSegments() methods, checking cancellation before blocking channel sends to the segments work queue. - Add context cancellation check in BlockSyncer before checking blocks on disk, ensuring it responds during the initial scan phase. - Update sync/client/leaf_syncer.go to pass context to OnLeafs() callbacks. This ensures all syncers detect cancellation immediately and exit gracefully instead of hanging until timeout. --- plugin/evm/atomic/sync/syncer.go | 10 ++++--- sync/blocksync/syncer.go | 5 ++++ sync/client/leaf_syncer.go | 18 ++++++------- sync/statesync/code_queue.go | 13 +++++++-- sync/statesync/code_queue_test.go | 8 +++--- sync/statesync/code_syncer_test.go | 2 +- sync/statesync/state_syncer.go | 11 ++++++-- sync/statesync/trie_segments.go | 43 ++++++++++++++++++++++-------- sync/statesync/trie_sync_tasks.go | 9 ++++--- 9 files changed, 83 insertions(+), 36 deletions(-) diff --git a/plugin/evm/atomic/sync/syncer.go b/plugin/evm/atomic/sync/syncer.go index 9d963f2899..89d44247f4 100644 --- a/plugin/evm/atomic/sync/syncer.go +++ b/plugin/evm/atomic/sync/syncer.go @@ -156,7 +156,11 @@ func addZeroes(height uint64) []byte { } // onLeafs is the callback for the leaf syncer, which will insert the key-value pairs into the trie. -func (s *Syncer) onLeafs(keys [][]byte, values [][]byte) error { +func (s *Syncer) onLeafs(ctx context.Context, keys [][]byte, values [][]byte) error { + if err := ctx.Err(); err != nil { + return err + } + for i, key := range keys { if len(key) != atomicstate.TrieKeyLength { return fmt.Errorf("unexpected key len (%d) in atomic trie sync", len(key)) @@ -239,6 +243,6 @@ func (a *syncerLeafTask) OnFinish(context.Context) error { return a.syncer.onFin func (*syncerLeafTask) OnStart() (bool, error) { return false, nil } func (a *syncerLeafTask) Root() common.Hash { return a.syncer.targetRoot } func (*syncerLeafTask) Account() common.Hash { return common.Hash{} } -func (a *syncerLeafTask) OnLeafs(keys, vals [][]byte) error { - return a.syncer.onLeafs(keys, vals) +func (a *syncerLeafTask) OnLeafs(ctx context.Context, keys, vals [][]byte) error { + return a.syncer.onLeafs(ctx, keys, vals) } diff --git a/sync/blocksync/syncer.go b/sync/blocksync/syncer.go index 8836075a69..4fee881dcc 100644 --- a/sync/blocksync/syncer.go +++ b/sync/blocksync/syncer.go @@ -78,6 +78,11 @@ func (s *BlockSyncer) Sync(ctx context.Context) error { // first, check for blocks already available on disk so we don't // request them from peers. for blocksToFetch > 0 { + // Check for context cancellation before checking each block. + if err := ctx.Err(); err != nil { + return err + } + blk := rawdb.ReadBlock(s.db, nextHash, nextHeight) if blk == nil { // block was not found diff --git a/sync/client/leaf_syncer.go b/sync/client/leaf_syncer.go index 05199c7eb7..8f7a1b8626 100644 --- a/sync/client/leaf_syncer.go +++ b/sync/client/leaf_syncer.go @@ -24,14 +24,14 @@ var ErrFailedToFetchLeafs = errors.New("failed to fetch leafs") // the same value for Root, Account, Start, and NodeType throughout the sync. // The value returned by End can change between calls to OnLeafs. type LeafSyncTask interface { - Root() common.Hash // Root of the trie to sync - Account() common.Hash // Account hash of the trie to sync (only applicable to storage tries) - Start() []byte // Starting key to request new leaves - End() []byte // End key to request new leaves - NodeType() message.NodeType // Specifies the message type (atomic/state trie) for the leaf syncer to send - OnStart() (bool, error) // Callback when tasks begins, returns true if work can be skipped - OnLeafs(keys, vals [][]byte) error // Callback when new leaves are received from the network - OnFinish(ctx context.Context) error // Callback when there are no more leaves in the trie to sync or when we reach End() + Root() common.Hash // Root of the trie to sync + Account() common.Hash // Account hash of the trie to sync (only applicable to storage tries) + Start() []byte // Starting key to request new leaves + End() []byte // End key to request new leaves + NodeType() message.NodeType // Specifies the message type (atomic/state trie) for the leaf syncer to send + OnStart() (bool, error) // Callback when tasks begins, returns true if work can be skipped + OnLeafs(ctx context.Context, keys, vals [][]byte) error // Callback when new leaves are received from the network + OnFinish(ctx context.Context) error // Callback when there are no more leaves in the trie to sync or when we reach End() } type LeafSyncerConfig struct { @@ -128,7 +128,7 @@ func (c *CallbackLeafSyncer) syncTask(ctx context.Context, task LeafSyncTask) er leafsResponse.Vals = leafsResponse.Vals[:i+1] } - if err := task.OnLeafs(leafsResponse.Keys, leafsResponse.Vals); err != nil { + if err := task.OnLeafs(ctx, leafsResponse.Keys, leafsResponse.Vals); err != nil { return err } diff --git a/sync/statesync/code_queue.go b/sync/statesync/code_queue.go index af31f62988..0d8fc9570a 100644 --- a/sync/statesync/code_queue.go +++ b/sync/statesync/code_queue.go @@ -4,6 +4,7 @@ package statesync import ( + "context" "errors" "fmt" "sync" @@ -113,7 +114,7 @@ func (q *CodeQueue) closeChannelOnce() bool { // AddCode persists and enqueues new code hashes. // Persists idempotent "to-fetch" markers for all inputs and enqueues them as-is. // Returns errAddCodeAfterFinalize after a clean finalize and errFailedToAddCodeHashesToQueue on early quit. -func (q *CodeQueue) AddCode(codeHashes []common.Hash) error { +func (q *CodeQueue) AddCode(ctx context.Context, codeHashes []common.Hash) error { if len(codeHashes) == 0 { return nil } @@ -143,10 +144,18 @@ func (q *CodeQueue) AddCode(codeHashes []common.Hash) error { } for _, h := range codeHashes { + // Check context cancellation before attempting to send to avoid blocking + // on a full channel when shutdown occurs. + if err := ctx.Err(); err != nil { + return err + } + select { case q.in <- h: // guaranteed to be open or nil, but never closed case <-q.quit: return errFailedToAddCodeHashesToQueue + case <-ctx.Done(): + return ctx.Err() } } return nil @@ -172,7 +181,7 @@ func (q *CodeQueue) init() error { if err != nil { return fmt.Errorf("unable to recover previous sync state: %w", err) } - if err := q.AddCode(dbCodeHashes); err != nil { + if err := q.AddCode(context.Background(), dbCodeHashes); err != nil { return fmt.Errorf("unable to resume previous sync: %w", err) } diff --git a/sync/statesync/code_queue_test.go b/sync/statesync/code_queue_test.go index f714401876..c936c15ab8 100644 --- a/sync/statesync/code_queue_test.go +++ b/sync/statesync/code_queue_test.go @@ -96,13 +96,13 @@ func TestCodeQueue(t *testing.T) { recvDone := make(chan struct{}) go func() { for _, add := range tt.addCode { - require.NoErrorf(t, q.AddCode(add), "%T.AddCode(%v)", q, add) + require.NoErrorf(t, q.AddCode(t.Context(), add), "%T.AddCode(%v)", q, add) } if tt.quitInsteadOfFinalize { close(quit) <-recvDone - require.ErrorIsf(t, q.AddCode(tt.addCodeAfter), errFailedToAddCodeHashesToQueue, "%T.AddCode() after `quit` channel closed", q) + require.ErrorIsf(t, q.AddCode(t.Context(), tt.addCodeAfter), errFailedToAddCodeHashesToQueue, "%T.AddCode() after `quit` channel closed", q) } else { require.NoErrorf(t, q.Finalize(), "%T.Finalize()", q) // Avoid leaking the internal goroutine @@ -165,7 +165,7 @@ func TestCodeQueue_FinalizeWaitsForInflightAddCodeCalls(t *testing.T) { addDone := make(chan error, 1) go func() { - addDone <- q.AddCode(hashes) + addDone <- q.AddCode(t.Context(), hashes) }() // Read the first enqueued hash to ensure AddCode is actively enqueuing and will block on the next send. @@ -238,7 +238,7 @@ func TestQuitAndAddCodeRace(t *testing.T) { <-start // Due to the race condition, AddCode may either succeed or fail // depending on whether the quit channel is closed first - _ = q.AddCode(in) + _ = q.AddCode(t.Context(), in) }() ready.Wait() diff --git a/sync/statesync/code_syncer_test.go b/sync/statesync/code_syncer_test.go index 58312bf99c..0cc89f7a71 100644 --- a/sync/statesync/code_syncer_test.go +++ b/sync/statesync/code_syncer_test.go @@ -69,7 +69,7 @@ func testCodeSyncer(t *testing.T, test codeSyncerTest) { require.NoError(t, err) go func() { for _, codeHashes := range test.codeRequestHashes { - if err := codeQueue.AddCode(codeHashes); err != nil { + if err := codeQueue.AddCode(t.Context(), codeHashes); err != nil { require.ErrorIs(t, err, test.err) } } diff --git a/sync/statesync/state_syncer.go b/sync/statesync/state_syncer.go index adcf1809d8..9976aceacf 100644 --- a/sync/statesync/state_syncer.go +++ b/sync/statesync/state_syncer.go @@ -127,7 +127,12 @@ func NewSyncer(client syncclient.Client, db ethdb.Database, root common.Hash, co return nil, err } ss.addTrieInProgress(ss.root, ss.mainTrie) - ss.mainTrie.startSyncing() // start syncing after tracking the trie as in progress + + // Use context.Background() for initialization since we don't have a sync context yet. + // This is safe because startSyncing is called before Sync() starts. + if err := ss.mainTrie.startSyncing(context.Background()); err != nil { + return nil, err + } return ss, nil } @@ -264,7 +269,9 @@ func (t *stateSync) storageTrieProducer(ctx context.Context) error { close(t.storageTriesDone) } // start syncing after tracking the trie as in progress - storageTrie.startSyncing() + if err := storageTrie.startSyncing(ctx); err != nil { + return err + } if !more { return nil diff --git a/sync/statesync/trie_segments.go b/sync/statesync/trie_segments.go index e1dcaf9a32..3547a23f62 100644 --- a/sync/statesync/trie_segments.go +++ b/sync/statesync/trie_segments.go @@ -140,11 +140,22 @@ func (t *trieToSync) loadSegments() error { return it.Error() } -// startSyncing adds the trieToSync's segments to the work queue -func (t *trieToSync) startSyncing() { +// startSyncing adds the trieToSync's segments to the work queue. +func (t *trieToSync) startSyncing(ctx context.Context) error { for _, segment := range t.segments { - t.sync.segments <- segment // this will queue the segment for syncing + // Check context cancellation before attempting to send to avoid blocking + // on a full channel when shutdown occurs. + if err := ctx.Err(); err != nil { + return err + } + + select { + case t.sync.segments <- segment: + case <-ctx.Done(): + return ctx.Err() + } } + return nil } // addSegment appends a newly created segment specified by [start] and @@ -244,12 +255,12 @@ func (t *trieToSync) segmentFinished(ctx context.Context, idx int) error { // createSegmentsIfNeeded is called from the leaf handler. In case the trie syncing only has // one segment but a large number of leafs ([t.estimateSize() > segmentThreshold], it will // create [numSegments-1] additional segments to sync the trie. -func (t *trieToSync) createSegmentsIfNeeded(numSegments int) error { +func (t *trieToSync) createSegmentsIfNeeded(ctx context.Context, numSegments int) error { if !t.shouldSegment() { return nil } - return t.createSegments(numSegments) + return t.createSegments(ctx, numSegments) } // shouldSegment returns true if a trie should be separated into segments. @@ -276,7 +287,7 @@ func (t *trieToSync) shouldSegment() bool { // key of consecutive segments. // createSegments should only be called once when there is only one // thread accessing this trie, such that there is no need to hold a lock. -func (t *trieToSync) createSegments(numSegments int) error { +func (t *trieToSync) createSegments(ctx context.Context, numSegments int) error { segment := t.segments[0] segmentStep := 0x10000 / numSegments @@ -313,7 +324,17 @@ func (t *trieToSync) createSegments(numSegments int) error { // is already syncing. // this avoids concurrent access to [t.segments]. for i := 1; i < len(t.segments); i++ { - t.sync.segments <- t.segments[i] + // Check context cancellation before attempting to send to avoid blocking + // on a full channel when shutdown occurs. + if err := ctx.Err(); err != nil { + return err + } + + select { + case t.sync.segments <- t.segments[i]: + case <-ctx.Done(): + return ctx.Err() + } } t.sync.stats.incTriesSegmented() log.Debug("statesync: trie segmented for parallel sync", "root", t.root, "account", t.account, "segments", len(t.segments)) @@ -359,9 +380,9 @@ func (t *trieSegment) Start() []byte { return t.start } -func (t *trieSegment) OnLeafs(keys, vals [][]byte) error { +func (t *trieSegment) OnLeafs(ctx context.Context, keys, vals [][]byte) error { // invoke the onLeafs callback - if err := t.trie.task.OnLeafs(t.batch, keys, vals); err != nil { + if err := t.trie.task.OnLeafs(ctx, t.batch, keys, vals); err != nil { return err } // cap the segment's batch @@ -381,9 +402,9 @@ func (t *trieSegment) OnLeafs(keys, vals [][]byte) error { t.trie.sync.stats.incLeafs(t, uint64(len(keys)), t.estimateSize()) if t.trie.root == t.trie.sync.root { - return t.trie.createSegmentsIfNeeded(numMainTrieSegments) + return t.trie.createSegmentsIfNeeded(ctx, numMainTrieSegments) } else { - return t.trie.createSegmentsIfNeeded(numStorageTrieSegments) + return t.trie.createSegmentsIfNeeded(ctx, numStorageTrieSegments) } } diff --git a/sync/statesync/trie_sync_tasks.go b/sync/statesync/trie_sync_tasks.go index d06931046e..455f75e00e 100644 --- a/sync/statesync/trie_sync_tasks.go +++ b/sync/statesync/trie_sync_tasks.go @@ -4,6 +4,7 @@ package statesync import ( + "context" "fmt" "github.com/ava-labs/libevm/common" @@ -31,7 +32,7 @@ type syncTask interface { // callbacks used to form a LeafSyncTask OnStart() (bool, error) - OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) error + OnLeafs(ctx context.Context, db ethdb.KeyValueWriter, keys, vals [][]byte) error OnFinish() error } @@ -59,7 +60,7 @@ func (m *mainTrieTask) OnFinish() error { return m.sync.onMainTrieFinished() } -func (m *mainTrieTask) OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) error { +func (m *mainTrieTask) OnLeafs(ctx context.Context, db ethdb.KeyValueWriter, keys, vals [][]byte) error { codeHashes := make([]common.Hash, 0) // loop over the keys, decode them as accounts, then check for any // storage or code we need to sync as well. @@ -88,7 +89,7 @@ func (m *mainTrieTask) OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) err } } // Add collected code hashes to the code fetcher. - return m.sync.codeQueue.AddCode(codeHashes) + return m.sync.codeQueue.AddCode(ctx, codeHashes) } type storageTrieTask struct { @@ -142,7 +143,7 @@ func (s *storageTrieTask) OnFinish() error { return s.sync.onStorageTrieFinished(s.root) } -func (s *storageTrieTask) OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) error { +func (s *storageTrieTask) OnLeafs(_ context.Context, db ethdb.KeyValueWriter, keys, vals [][]byte) error { // persists the trie leafs to the snapshot for all accounts associated with this root for _, account := range s.accounts { for i, key := range keys { From 23ab197b1c85770f64feb1d3ecc3d926f20958e1 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Wed, 19 Nov 2025 12:35:42 +0200 Subject: [PATCH 07/14] fix: remove per-select context error checks --- sync/statesync/trie_segments.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sync/statesync/trie_segments.go b/sync/statesync/trie_segments.go index 3547a23f62..9d2b443389 100644 --- a/sync/statesync/trie_segments.go +++ b/sync/statesync/trie_segments.go @@ -143,12 +143,6 @@ func (t *trieToSync) loadSegments() error { // startSyncing adds the trieToSync's segments to the work queue. func (t *trieToSync) startSyncing(ctx context.Context) error { for _, segment := range t.segments { - // Check context cancellation before attempting to send to avoid blocking - // on a full channel when shutdown occurs. - if err := ctx.Err(); err != nil { - return err - } - select { case t.sync.segments <- segment: case <-ctx.Done(): @@ -324,12 +318,6 @@ func (t *trieToSync) createSegments(ctx context.Context, numSegments int) error // is already syncing. // this avoids concurrent access to [t.segments]. for i := 1; i < len(t.segments); i++ { - // Check context cancellation before attempting to send to avoid blocking - // on a full channel when shutdown occurs. - if err := ctx.Err(); err != nil { - return err - } - select { case t.sync.segments <- t.segments[i]: case <-ctx.Done(): From d7ec158264ee814ff5bd5b9d2d679b193e62c080 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Wed, 19 Nov 2025 12:56:14 +0200 Subject: [PATCH 08/14] test(vmsync): simplify test helpers and remove redundant cancellation tracking - Remove canceledWG from NewCancelAwareSyncer() and related test assertions. - Unify and simplify function comments. - Make startedWG nil-safe in all helpers. - Remove TestSyncerRegistry_WrappedContextCanceledError. --- plugin/evm/vmsync/doubles_test.go | 36 +++++++--------- plugin/evm/vmsync/registry_test.go | 66 +++++------------------------- 2 files changed, 25 insertions(+), 77 deletions(-) diff --git a/plugin/evm/vmsync/doubles_test.go b/plugin/evm/vmsync/doubles_test.go index f32c950aac..39fcfde563 100644 --- a/plugin/evm/vmsync/doubles_test.go +++ b/plugin/evm/vmsync/doubles_test.go @@ -27,15 +27,13 @@ func (FuncSyncer) ID() string { return "test_id" } var _ syncpkg.Syncer = FuncSyncer{} -// NewBarrierSyncer returns a syncer that, upon entering Sync, calls wg.Done() to -// signal it has started, then blocks until either: -// - `releaseCh` is closed, returning nil; or -// - `ctx` is canceled, returning ctx.Err. -// -// This acts as a barrier to coordinate test goroutines. -func NewBarrierSyncer(wg *sync.WaitGroup, releaseCh <-chan struct{}) FuncSyncer { +// NewBarrierSyncer returns a syncer that signals startedWG.Done() when Sync begins, +// then blocks until releaseCh is closed (returns nil) or ctx is canceled (returns ctx.Err). +func NewBarrierSyncer(startedWG *sync.WaitGroup, releaseCh <-chan struct{}) FuncSyncer { return FuncSyncer{fn: func(ctx context.Context) error { - wg.Done() + if startedWG != nil { + startedWG.Done() + } select { case <-releaseCh: return nil @@ -45,10 +43,9 @@ func NewBarrierSyncer(wg *sync.WaitGroup, releaseCh <-chan struct{}) FuncSyncer }} } -// NewErrorSyncer returns a syncer that waits until either `trigger` is closed -// (then returns `errToReturn`) or `ctx` is canceled (then returns ctx.Err). -// If `startedWG` is provided, it calls Done() when Sync begins. -func NewErrorSyncer(trigger <-chan struct{}, errToReturn error, startedWG *sync.WaitGroup) FuncSyncer { +// NewErrorSyncer returns a syncer that signals startedWG.Done() when Sync begins, +// then blocks until trigger is closed (returns errToReturn) or ctx is canceled (returns ctx.Err). +func NewErrorSyncer(startedWG *sync.WaitGroup, trigger <-chan struct{}, errToReturn error) FuncSyncer { return FuncSyncer{fn: func(ctx context.Context) error { if startedWG != nil { startedWG.Done() @@ -62,18 +59,15 @@ func NewErrorSyncer(trigger <-chan struct{}, errToReturn error, startedWG *sync. }} } -// NewCancelAwareSyncer calls startedWG.Done() as soon as Sync begins, then waits for -// either: -// - `ctx` cancellation: calls canceledWG.Done() and returns ctx.Err; or -// - `timeout` elapsing: returns an error indicating a timeout. -// -// Useful for asserting that cancellation propagates to the syncer under test. -func NewCancelAwareSyncer(startedWG, canceledWG *sync.WaitGroup, timeout time.Duration) FuncSyncer { +// NewCancelAwareSyncer returns a syncer that signals startedWG.Done() when Sync begins, +// then blocks until ctx is canceled (returns ctx.Err) or timeout elapses (returns timeout error). +func NewCancelAwareSyncer(startedWG *sync.WaitGroup, timeout time.Duration) FuncSyncer { return FuncSyncer{fn: func(ctx context.Context) error { - startedWG.Done() + if startedWG != nil { + startedWG.Done() + } select { case <-ctx.Done(): - canceledWG.Done() return ctx.Err() case <-time.After(timeout): return errors.New("syncer timed out waiting for cancellation") diff --git a/plugin/evm/vmsync/registry_test.go b/plugin/evm/vmsync/registry_test.go index d464b9f879..0d68e114c7 100644 --- a/plugin/evm/vmsync/registry_test.go +++ b/plugin/evm/vmsync/registry_test.go @@ -230,12 +230,12 @@ func TestSyncerRegistry_ErrorPropagatesAndCancelsOthers(t *testing.T) { errFirst := errors.New("test error") var errorSyncerStartedWG sync.WaitGroup errorSyncerStartedWG.Add(1) - errorSyncer := &namedSyncer{name: "ErrorSyncer-0", syncer: NewErrorSyncer(trigger, errFirst, &errorSyncerStartedWG)} + errorSyncer := &namedSyncer{name: "ErrorSyncer-0", syncer: NewErrorSyncer(&errorSyncerStartedWG, trigger, errFirst)} require.NoError(t, registry.Register(errorSyncer)) // Cancel-aware syncers to verify cancellation propagation. const numCancelSyncers = 2 - startedWG, canceledWG := registerCancelAwareSyncers(t, registry, numCancelSyncers, 4*time.Second) + startedWG := registerCancelAwareSyncers(t, registry, numCancelSyncers, 4*time.Second) doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) @@ -247,8 +247,6 @@ func TestSyncerRegistry_ErrorPropagatesAndCancelsOthers(t *testing.T) { err := utilstest.WaitErrWithTimeout(t, doneCh, 4*time.Second) require.ErrorIs(t, err, errFirst) - - utilstest.WaitGroupWithTimeout(t, canceledWG, 2*time.Second, "cancellation was not propagated to all syncers") } func TestSyncerRegistry_FirstErrorWinsAcrossMany(t *testing.T) { @@ -276,7 +274,7 @@ func TestSyncerRegistry_FirstErrorWinsAcrossMany(t *testing.T) { errFirst = errInstance } name := fmt.Sprintf("ErrorSyncer-%d", i) - require.NoError(t, registry.Register(&namedSyncer{name: name, syncer: NewErrorSyncer(trigger, errInstance, &allErrorSyncersStartedWG)})) + require.NoError(t, registry.Register(&namedSyncer{name: name, syncer: NewErrorSyncer(&allErrorSyncersStartedWG, trigger, errInstance)})) } doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) @@ -304,12 +302,6 @@ func TestSyncerRegistry_NoSyncersRegistered(t *testing.T) { func TestSyncerRegistry_ContextCancellationErrors(t *testing.T) { t.Parallel() - // Test scenario that reproduces the issue where context cancellation during shutdown - // was being logged as ERROR instead of INFO. The fix ensures that: - // 1. Context cancellation errors are returned as-is (not wrapped). - // 2. Each syncer cancellation is logged as INFO, not ERROR. - // 3. The error propagates correctly to the caller. - tests := []struct { name string wantErr error @@ -338,7 +330,7 @@ func TestSyncerRegistry_ContextCancellationErrors(t *testing.T) { registry := NewSyncerRegistry() - startedWG, canceledWG := registerCancelAwareSyncers(t, registry, tt.numSyncers, tt.syncerTimeout) + startedWG := registerCancelAwareSyncers(t, registry, tt.numSyncers, tt.syncerTimeout) var ( ctx context.Context @@ -370,9 +362,6 @@ func TestSyncerRegistry_ContextCancellationErrors(t *testing.T) { // Verify error propagates correctly. require.ErrorIs(t, err, tt.wantErr) - - // Verify that all syncers detected cancellation. - utilstest.WaitGroupWithTimeout(t, canceledWG, 2*time.Second, "cancellation was not propagated to all syncers") }) } } @@ -427,7 +416,7 @@ func TestSyncerRegistry_MixedCancellationAndSuccess(t *testing.T) { // Create syncers that will be cancelled. const numCancelSyncers = 2 - startedWG, canceledWG := registerCancelAwareSyncers(t, registry, numCancelSyncers, 5*time.Second) + startedWG := registerCancelAwareSyncers(t, registry, numCancelSyncers, 5*time.Second) doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) @@ -441,40 +430,6 @@ func TestSyncerRegistry_MixedCancellationAndSuccess(t *testing.T) { // Verify that the cancellation error is returned. require.ErrorIs(t, err, context.Canceled) - - // Verify that all cancel-aware syncers detected cancellation. - utilstest.WaitGroupWithTimeout(t, canceledWG, 2*time.Second, "cancellation was not propagated to all syncers") -} - -func TestSyncerRegistry_WrappedContextCanceledError(t *testing.T) { - t.Parallel() - - registry := NewSyncerRegistry() - - // Create a syncer that returns a wrapped context.Canceled error - // (simulating the real-world scenario from the original error). - wrappedErrorSyncer := FuncSyncer{ - fn: func(_ context.Context) error { - // Simulate the wrapped error pattern of for example the code syncer not being able to fetch a codehash. - return fmt.Errorf("could not get code (CodeRequest(...)): %w", context.Canceled) - }, - } - - require.NoError(t, registry.Register(&namedSyncer{ - name: "WrappedErrorSyncer", - syncer: wrappedErrorSyncer, - })) - - ctx, cancel := context.WithCancel(t.Context()) - t.Cleanup(cancel) - - // Start syncers - the wrapped error should be detected by errors.Is(). - g := registry.StartAsync(ctx, newTestClientSummary(t)) - - err := g.Wait() - - // Verify that errors.Is correctly identifies the wrapped context.Canceled. - require.ErrorIs(t, err, context.Canceled, "wrapped context.Canceled should be detected by errors.Is") } // startSyncersAsync starts the syncers asynchronously using StartAsync and returns a channel to receive the error. @@ -488,19 +443,18 @@ func startSyncersAsync(registry *SyncerRegistry, ctx context.Context, summary me } // registerCancelAwareSyncers registers [numSyncers] cancel-aware syncers with the registry -// and returns WaitGroups for coordination. -func registerCancelAwareSyncers(t *testing.T, registry *SyncerRegistry, numSyncers int, timeout time.Duration) (*sync.WaitGroup, *sync.WaitGroup) { +// and returns a WaitGroup to coordinate when syncers have started. +func registerCancelAwareSyncers(t *testing.T, registry *SyncerRegistry, numSyncers int, timeout time.Duration) *sync.WaitGroup { t.Helper() - var startedWG, canceledWG sync.WaitGroup + var startedWG sync.WaitGroup startedWG.Add(numSyncers) - canceledWG.Add(numSyncers) for i := 0; i < numSyncers; i++ { require.NoError(t, registry.Register(&namedSyncer{ name: fmt.Sprintf("Syncer-%d", i), - syncer: NewCancelAwareSyncer(&startedWG, &canceledWG, timeout), + syncer: NewCancelAwareSyncer(&startedWG, timeout), })) } - return &startedWG, &canceledWG + return &startedWG } func newTestClientSummary(t *testing.T) message.Syncable { From 2219e78ec446d40b7746555fb80e44f722319f0e Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Wed, 19 Nov 2025 13:09:54 +0200 Subject: [PATCH 09/14] fix(statesync): remove pre-select context err check in code_queue.go --- sync/statesync/code_queue.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sync/statesync/code_queue.go b/sync/statesync/code_queue.go index 0d8fc9570a..e0bda7c905 100644 --- a/sync/statesync/code_queue.go +++ b/sync/statesync/code_queue.go @@ -144,12 +144,6 @@ func (q *CodeQueue) AddCode(ctx context.Context, codeHashes []common.Hash) error } for _, h := range codeHashes { - // Check context cancellation before attempting to send to avoid blocking - // on a full channel when shutdown occurs. - if err := ctx.Err(); err != nil { - return err - } - select { case q.in <- h: // guaranteed to be open or nil, but never closed case <-q.quit: From c3e15902f0abcf77aee3055280b0d405e3b35724 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Thu, 20 Nov 2025 15:49:41 +0200 Subject: [PATCH 10/14] fix: small remarks --- plugin/evm/vmsync/registry_test.go | 18 +++++++++--------- sync/statesync/code_queue_test.go | 3 ++- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/plugin/evm/vmsync/registry_test.go b/plugin/evm/vmsync/registry_test.go index 0d68e114c7..dd0068ea06 100644 --- a/plugin/evm/vmsync/registry_test.go +++ b/plugin/evm/vmsync/registry_test.go @@ -332,15 +332,7 @@ func TestSyncerRegistry_ContextCancellationErrors(t *testing.T) { startedWG := registerCancelAwareSyncers(t, registry, tt.numSyncers, tt.syncerTimeout) - var ( - ctx context.Context - cancel context.CancelFunc - ) - if tt.wantErr == context.DeadlineExceeded { - ctx, cancel = context.WithTimeout(t.Context(), tt.timeout) - } else { - ctx, cancel = context.WithCancel(t.Context()) - } + ctx, cancel := newTestContext(t, tt.wantErr, tt.timeout) t.Cleanup(cancel) doneCh := startSyncersAsync(registry, ctx, newTestClientSummary(t)) @@ -457,6 +449,14 @@ func registerCancelAwareSyncers(t *testing.T, registry *SyncerRegistry, numSynce return &startedWG } +func newTestContext(t *testing.T, wantErr error, timeout time.Duration) (context.Context, context.CancelFunc) { + t.Helper() + if wantErr == context.DeadlineExceeded { + return context.WithTimeout(t.Context(), timeout) + } + return context.WithCancel(t.Context()) +} + func newTestClientSummary(t *testing.T) message.Syncable { t.Helper() summary, err := message.NewBlockSyncSummary(common.HexToHash("0xdeadbeef"), 1000, common.HexToHash("0xdeadbeef")) diff --git a/sync/statesync/code_queue_test.go b/sync/statesync/code_queue_test.go index c936c15ab8..74da3e7731 100644 --- a/sync/statesync/code_queue_test.go +++ b/sync/statesync/code_queue_test.go @@ -102,7 +102,8 @@ func TestCodeQueue(t *testing.T) { if tt.quitInsteadOfFinalize { close(quit) <-recvDone - require.ErrorIsf(t, q.AddCode(t.Context(), tt.addCodeAfter), errFailedToAddCodeHashesToQueue, "%T.AddCode() after `quit` channel closed", q) + err := q.AddCode(t.Context(), tt.addCodeAfter) + require.ErrorIsf(t, err, errFailedToAddCodeHashesToQueue, "%T.AddCode() after `quit` channel closed", q) } else { require.NoErrorf(t, q.Finalize(), "%T.Finalize()", q) // Avoid leaking the internal goroutine From cf30dfe502e9801b6d4500ac921417adbba9ff5f Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Fri, 21 Nov 2025 16:43:24 +0200 Subject: [PATCH 11/14] fix(statesync): add context cancellation check in storageTrieTask.OnLeafs - Add context cancellation check in `storageTrieTask.OnLeafs` before processing each account to allow early exit during shutdown. - Add comment explaining `context.Background()` usage in `CodeQueue.init()` since it runs during construction before sync starts. --- sync/statesync/code_queue.go | 2 ++ sync/statesync/trie_sync_tasks.go | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sync/statesync/code_queue.go b/sync/statesync/code_queue.go index e0bda7c905..7c8c4c7197 100644 --- a/sync/statesync/code_queue.go +++ b/sync/statesync/code_queue.go @@ -175,6 +175,8 @@ func (q *CodeQueue) init() error { if err != nil { return fmt.Errorf("unable to recover previous sync state: %w", err) } + // Use context.Background() since init() runs during construction before sync starts. + // The channel is empty, so sends won't block. Shutdown is handled via q.quit in AddCode. if err := q.AddCode(context.Background(), dbCodeHashes); err != nil { return fmt.Errorf("unable to resume previous sync: %w", err) } diff --git a/sync/statesync/trie_sync_tasks.go b/sync/statesync/trie_sync_tasks.go index 455f75e00e..556254a418 100644 --- a/sync/statesync/trie_sync_tasks.go +++ b/sync/statesync/trie_sync_tasks.go @@ -143,9 +143,13 @@ func (s *storageTrieTask) OnFinish() error { return s.sync.onStorageTrieFinished(s.root) } -func (s *storageTrieTask) OnLeafs(_ context.Context, db ethdb.KeyValueWriter, keys, vals [][]byte) error { +func (s *storageTrieTask) OnLeafs(ctx context.Context, db ethdb.KeyValueWriter, keys, vals [][]byte) error { // persists the trie leafs to the snapshot for all accounts associated with this root for _, account := range s.accounts { + // Check context cancellation before processing each account to allow early exit during shutdown. + if err := ctx.Err(); err != nil { + return err + } for i, key := range keys { rawdb.WriteStorageSnapshot(db, account, common.BytesToHash(key), vals[i]) } From ee19498b6879feaa34fd767babd7ca0373d0d666 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Fri, 21 Nov 2025 16:47:54 +0200 Subject: [PATCH 12/14] fix(blocksync): remove redundant context checks --- sync/blocksync/syncer.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sync/blocksync/syncer.go b/sync/blocksync/syncer.go index 4fee881dcc..ca69a6dabc 100644 --- a/sync/blocksync/syncer.go +++ b/sync/blocksync/syncer.go @@ -78,11 +78,6 @@ func (s *BlockSyncer) Sync(ctx context.Context) error { // first, check for blocks already available on disk so we don't // request them from peers. for blocksToFetch > 0 { - // Check for context cancellation before checking each block. - if err := ctx.Err(); err != nil { - return err - } - blk := rawdb.ReadBlock(s.db, nextHash, nextHeight) if blk == nil { // block was not found @@ -99,11 +94,6 @@ func (s *BlockSyncer) Sync(ctx context.Context) error { // them to disk. batch := s.db.NewBatch() for fetched := uint64(0); fetched < blocksToFetch && (nextHash != common.Hash{}); { - // Check for context cancellation before making network requests. - if err := ctx.Err(); err != nil { - return err - } - log.Info("fetching blocks from peer", "fetched", fetched, "total", blocksToFetch) blocks, err := s.client.GetBlocks(ctx, nextHash, nextHeight, blocksPerRequest) if err != nil { From 5131d0c96a474a5a003400a75b7cd004873d7966 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Fri, 21 Nov 2025 16:37:36 +0200 Subject: [PATCH 13/14] feat(statesync): introduce Finalizer interface for syncer cleanup Add a `Finalizer` interface to provide explicit cleanup operations for syncers. This ensures cleanup (like flushing batches to disk) is performed reliably even on cancellation or early returns. - Add `Finalizer` interface to `sync/types.go` for explicit cleanup. - Attach `Finalize()` in `CodeQueue` that finalizes code fetching to this new interface. - Gather finalization logic in a `Finalize()` for StateSyncer to flush in-progress trie batches. - Implement `Finalize()` for AtomicSyncer to commit pending database changes. - Add `FinalizeAll()` to SyncerRegistry with defer to ensure cleanup runs. - Remove `OnFailure` callback mechanism (replaced by `Finalizer`). resolves #1089 Signed-off-by: Tsvetan Dimitrov (tsvetan.dimitrov23@gmail.com) --- plugin/evm/atomic/sync/syncer.go | 12 +++++++++++- plugin/evm/vmsync/registry.go | 16 ++++++++++++++++ sync/blocksync/syncer.go | 10 ---------- sync/client/leaf_syncer.go | 9 ++++----- sync/statesync/code_queue.go | 6 ++++++ sync/statesync/state_syncer.go | 13 ++++++------- sync/statesync/trie_sync_tasks.go | 6 +++++- sync/types.go | 7 +++++++ 8 files changed, 55 insertions(+), 24 deletions(-) diff --git a/plugin/evm/atomic/sync/syncer.go b/plugin/evm/atomic/sync/syncer.go index 89d44247f4..f7a368f19c 100644 --- a/plugin/evm/atomic/sync/syncer.go +++ b/plugin/evm/atomic/sync/syncer.go @@ -33,6 +33,7 @@ const ( var ( _ sync.Syncer = (*Syncer)(nil) _ syncclient.LeafSyncTask = (*syncerLeafTask)(nil) + _ sync.Finalizer = (*Syncer)(nil) errTargetHeightRequired = errors.New("target height must be > 0") ) @@ -125,7 +126,6 @@ func NewSyncer(client syncclient.LeafClient, db *versiondb.Database, atomicTrie syncer.syncer = syncclient.NewCallbackLeafSyncer(client, tasks, &syncclient.LeafSyncerConfig{ RequestSize: cfg.requestSize, NumWorkers: cfg.numWorkers, - OnFailure: func() {}, // No-op since we flush progress to disk at the regular commit interval. }) return syncer, nil @@ -146,6 +146,16 @@ func (s *Syncer) Sync(ctx context.Context) error { return s.syncer.Sync(ctx) } +// Finalize commits any pending database changes to disk. +// This ensures that even if the sync is cancelled or fails, we preserve +// the progress up to the last fully synced height. +func (s *Syncer) Finalize() error { + if s.db == nil { + return nil + } + return s.db.Commit() +} + // addZeroes returns the big-endian representation of `height`, prefixed with [common.HashLength] zeroes. func addZeroes(height uint64) []byte { // Key format is [height(8 bytes)][blockchainID(32 bytes)]. Start should be the diff --git a/plugin/evm/vmsync/registry.go b/plugin/evm/vmsync/registry.go index 0aab6c6f61..d32d82a02a 100644 --- a/plugin/evm/vmsync/registry.go +++ b/plugin/evm/vmsync/registry.go @@ -53,6 +53,10 @@ func (r *SyncerRegistry) Register(syncer syncpkg.Syncer) error { // RunSyncerTasks executes all registered syncers synchronously. func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syncable) error { + // Ensure finalization runs regardless of how this function exits. + // This guarantees cleanup even on early returns or panics. + defer r.FinalizeAll(summary) + // Early return if context is already canceled (e.g., during shutdown). if err := ctx.Err(); err != nil { return err @@ -102,3 +106,15 @@ func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncabl return g } + +// FinalizeAll iterates over all registered syncers and calls Finalize on those that implement the Finalizer interface. +// Errors are logged but not returned to ensure best-effort cleanup of all syncers. +func (r *SyncerRegistry) FinalizeAll(summary message.Syncable) { + for _, task := range r.syncers { + if f, ok := task.syncer.(syncpkg.Finalizer); ok { + if err := f.Finalize(); err != nil { + log.Error("failed to finalize syncer", "syncer", task.name, "err", err, "summary", summary.GetBlockHash().Hex(), "height", summary.Height()) + } + } + } +} diff --git a/sync/blocksync/syncer.go b/sync/blocksync/syncer.go index 4fee881dcc..ca69a6dabc 100644 --- a/sync/blocksync/syncer.go +++ b/sync/blocksync/syncer.go @@ -78,11 +78,6 @@ func (s *BlockSyncer) Sync(ctx context.Context) error { // first, check for blocks already available on disk so we don't // request them from peers. for blocksToFetch > 0 { - // Check for context cancellation before checking each block. - if err := ctx.Err(); err != nil { - return err - } - blk := rawdb.ReadBlock(s.db, nextHash, nextHeight) if blk == nil { // block was not found @@ -99,11 +94,6 @@ func (s *BlockSyncer) Sync(ctx context.Context) error { // them to disk. batch := s.db.NewBatch() for fetched := uint64(0); fetched < blocksToFetch && (nextHash != common.Hash{}); { - // Check for context cancellation before making network requests. - if err := ctx.Err(); err != nil { - return err - } - log.Info("fetching blocks from peer", "fetched", fetched, "total", blocksToFetch) blocks, err := s.client.GetBlocks(ctx, nextHash, nextHeight, blocksPerRequest) if err != nil { diff --git a/sync/client/leaf_syncer.go b/sync/client/leaf_syncer.go index 8f7a1b8626..b95708781c 100644 --- a/sync/client/leaf_syncer.go +++ b/sync/client/leaf_syncer.go @@ -37,7 +37,6 @@ type LeafSyncTask interface { type LeafSyncerConfig struct { RequestSize uint16 // Number of leafs to request from a peer at a time NumWorkers int // Number of workers to process leaf sync tasks - OnFailure func() // Callback for handling errors during sync } type CallbackLeafSyncer struct { @@ -159,9 +158,9 @@ func (c *CallbackLeafSyncer) Sync(ctx context.Context) error { }) } - err := eg.Wait() - if err != nil { - c.config.OnFailure() + if err := eg.Wait(); err != nil { + return err } - return err + + return nil } diff --git a/sync/statesync/code_queue.go b/sync/statesync/code_queue.go index e0bda7c905..1845267ec7 100644 --- a/sync/statesync/code_queue.go +++ b/sync/statesync/code_queue.go @@ -15,11 +15,15 @@ import ( "github.com/ava-labs/libevm/libevm/options" "github.com/ava-labs/coreth/plugin/evm/customrawdb" + + syncpkg "github.com/ava-labs/coreth/sync" ) const defaultQueueCapacity = 5000 var ( + _ syncpkg.Finalizer = (*CodeQueue)(nil) + errFailedToAddCodeHashesToQueue = errors.New("failed to add code hashes to queue") errFailedToFinalizeCodeQueue = errors.New("failed to finalize code queue") ) @@ -175,6 +179,8 @@ func (q *CodeQueue) init() error { if err != nil { return fmt.Errorf("unable to recover previous sync state: %w", err) } + // Use context.Background() since init() runs during construction before sync starts. + // The channel is empty, so sends won't block. Shutdown is handled via q.quit in AddCode. if err := q.AddCode(context.Background(), dbCodeHashes); err != nil { return fmt.Errorf("unable to resume previous sync: %w", err) } diff --git a/sync/statesync/state_syncer.go b/sync/statesync/state_syncer.go index 9976aceacf..c4e98ec2e6 100644 --- a/sync/statesync/state_syncer.go +++ b/sync/statesync/state_syncer.go @@ -106,7 +106,6 @@ func NewSyncer(client syncclient.Client, db ethdb.Database, root common.Hash, co ss.syncer = syncclient.NewCallbackLeafSyncer(client, ss.segments, &syncclient.LeafSyncerConfig{ RequestSize: leafsRequestSize, NumWorkers: defaultNumWorkers, - OnFailure: ss.onSyncFailure, }) if codeQueue == nil { @@ -301,19 +300,19 @@ func (t *stateSync) removeTrieInProgress(root common.Hash) (int, error) { return len(t.triesInProgress), nil } -// onSyncFailure is called if the sync fails, this writes all -// batches of in-progress trie segments to disk to have maximum -// progress to restore. -func (t *stateSync) onSyncFailure() { +// Finalize checks if there are any in-progress tries and flushes their batches to disk +// to preserve progress. This is called by the syncer registry on sync failure or cancellation. +func (t *stateSync) Finalize() error { t.lock.RLock() defer t.lock.RUnlock() for _, trie := range t.triesInProgress { for _, segment := range trie.segments { if err := segment.batch.Write(); err != nil { - log.Error("failed to write segment batch on sync failure", "err", err) - return + log.Error("failed to write segment batch on finalize", "err", err) + return err } } } + return nil } diff --git a/sync/statesync/trie_sync_tasks.go b/sync/statesync/trie_sync_tasks.go index 455f75e00e..556254a418 100644 --- a/sync/statesync/trie_sync_tasks.go +++ b/sync/statesync/trie_sync_tasks.go @@ -143,9 +143,13 @@ func (s *storageTrieTask) OnFinish() error { return s.sync.onStorageTrieFinished(s.root) } -func (s *storageTrieTask) OnLeafs(_ context.Context, db ethdb.KeyValueWriter, keys, vals [][]byte) error { +func (s *storageTrieTask) OnLeafs(ctx context.Context, db ethdb.KeyValueWriter, keys, vals [][]byte) error { // persists the trie leafs to the snapshot for all accounts associated with this root for _, account := range s.accounts { + // Check context cancellation before processing each account to allow early exit during shutdown. + if err := ctx.Err(); err != nil { + return err + } for i, key := range keys { rawdb.WriteStorageSnapshot(db, account, common.BytesToHash(key), vals[i]) } diff --git a/sync/types.go b/sync/types.go index d6e9005b06..93a3da652e 100644 --- a/sync/types.go +++ b/sync/types.go @@ -31,6 +31,13 @@ type Syncer interface { ID() string } +// Finalizer provides a mechanism to perform cleanup operations after a sync operation. +// This is useful for handling inflight requests, flushing to disk, or other cleanup tasks. +type Finalizer interface { + // Finalize performs any necessary cleanup operations. + Finalize() error +} + // SummaryProvider is an interface for providing state summaries. type SummaryProvider interface { StateSummaryAtBlock(ethBlock *types.Block) (block.StateSummary, error) From 69fb806b36ec59ccc93e8ee8b09d7da0801389fb Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Fri, 21 Nov 2025 17:11:52 +0200 Subject: [PATCH 14/14] fix(registry): ctx shadowing --- plugin/evm/vmsync/registry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/evm/vmsync/registry.go b/plugin/evm/vmsync/registry.go index 0aab6c6f61..7c38ae2d19 100644 --- a/plugin/evm/vmsync/registry.go +++ b/plugin/evm/vmsync/registry.go @@ -73,7 +73,7 @@ func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syn // whose Wait() completes when all syncers exit. The context returned will be // cancelled when any syncer fails, propagating shutdown to the others. func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncable) *errgroup.Group { - g, ctx := errgroup.WithContext(ctx) + g, egCtx := errgroup.WithContext(ctx) if len(r.syncers) == 0 { return g @@ -85,7 +85,7 @@ func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncabl for _, task := range r.syncers { g.Go(func() error { log.Info("starting syncer", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight) - if err := task.syncer.Sync(ctx); err != nil { + if err := task.syncer.Sync(egCtx); err != nil { // Context cancellation during shutdown is expected. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { log.Info("syncer cancelled", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight)