Skip to content
This repository was archived by the owner on Nov 25, 2025. It is now read-only.

Commit 84766a8

Browse files
authored
fix(vmsync,statesync): ensure syncers detect cancellation during graceful shutdown and fix registry cancellation logging (#1411)
1 parent 63d7606 commit 84766a8

File tree

11 files changed

+300
-103
lines changed

11 files changed

+300
-103
lines changed

plugin/evm/atomic/sync/syncer.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,11 @@ func addZeroes(height uint64) []byte {
156156
}
157157

158158
// onLeafs is the callback for the leaf syncer, which will insert the key-value pairs into the trie.
159-
func (s *Syncer) onLeafs(keys [][]byte, values [][]byte) error {
159+
func (s *Syncer) onLeafs(ctx context.Context, keys [][]byte, values [][]byte) error {
160+
if err := ctx.Err(); err != nil {
161+
return err
162+
}
163+
160164
for i, key := range keys {
161165
if len(key) != atomicstate.TrieKeyLength {
162166
return fmt.Errorf("unexpected key len (%d) in atomic trie sync", len(key))
@@ -239,6 +243,6 @@ func (a *syncerLeafTask) OnFinish(context.Context) error { return a.syncer.onFin
239243
func (*syncerLeafTask) OnStart() (bool, error) { return false, nil }
240244
func (a *syncerLeafTask) Root() common.Hash { return a.syncer.targetRoot }
241245
func (*syncerLeafTask) Account() common.Hash { return common.Hash{} }
242-
func (a *syncerLeafTask) OnLeafs(keys, vals [][]byte) error {
243-
return a.syncer.onLeafs(keys, vals)
246+
func (a *syncerLeafTask) OnLeafs(ctx context.Context, keys, vals [][]byte) error {
247+
return a.syncer.onLeafs(ctx, keys, vals)
244248
}

plugin/evm/vmsync/doubles_test.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,13 @@ func (FuncSyncer) ID() string { return "test_id" }
2727

2828
var _ syncpkg.Syncer = FuncSyncer{}
2929

30-
// NewBarrierSyncer returns a syncer that, upon entering Sync, calls wg.Done() to
31-
// signal it has started, then blocks until either:
32-
// - `releaseCh` is closed, returning nil; or
33-
// - `ctx` is canceled, returning ctx.Err.
34-
//
35-
// This acts as a barrier to coordinate test goroutines.
36-
func NewBarrierSyncer(wg *sync.WaitGroup, releaseCh <-chan struct{}) FuncSyncer {
30+
// NewBarrierSyncer returns a syncer that signals startedWG.Done() when Sync begins,
31+
// then blocks until releaseCh is closed (returns nil) or ctx is canceled (returns ctx.Err).
32+
func NewBarrierSyncer(startedWG *sync.WaitGroup, releaseCh <-chan struct{}) FuncSyncer {
3733
return FuncSyncer{fn: func(ctx context.Context) error {
38-
wg.Done()
34+
if startedWG != nil {
35+
startedWG.Done()
36+
}
3937
select {
4038
case <-releaseCh:
4139
return nil
@@ -45,10 +43,13 @@ func NewBarrierSyncer(wg *sync.WaitGroup, releaseCh <-chan struct{}) FuncSyncer
4543
}}
4644
}
4745

48-
// NewErrorSyncer returns a syncer that waits until either `trigger` is closed
49-
// (then returns `errToReturn`) or `ctx` is canceled (then returns ctx.Err).
50-
func NewErrorSyncer(trigger <-chan struct{}, errToReturn error) FuncSyncer {
46+
// NewErrorSyncer returns a syncer that signals startedWG.Done() when Sync begins,
47+
// then blocks until trigger is closed (returns errToReturn) or ctx is canceled (returns ctx.Err).
48+
func NewErrorSyncer(startedWG *sync.WaitGroup, trigger <-chan struct{}, errToReturn error) FuncSyncer {
5149
return FuncSyncer{fn: func(ctx context.Context) error {
50+
if startedWG != nil {
51+
startedWG.Done()
52+
}
5253
select {
5354
case <-trigger:
5455
return errToReturn
@@ -58,18 +59,15 @@ func NewErrorSyncer(trigger <-chan struct{}, errToReturn error) FuncSyncer {
5859
}}
5960
}
6061

61-
// NewCancelAwareSyncer closes `started` as soon as Sync begins, then waits for
62-
// either:
63-
// - `ctx` cancellation: closes `canceled` and returns ctx.Err; or
64-
// - `timeout` elapsing: returns an error indicating a timeout.
65-
//
66-
// Useful for asserting that cancellation propagates to the syncer under test.
67-
func NewCancelAwareSyncer(started, canceled chan struct{}, timeout time.Duration) FuncSyncer {
62+
// NewCancelAwareSyncer returns a syncer that signals startedWG.Done() when Sync begins,
63+
// then blocks until ctx is canceled (returns ctx.Err) or timeout elapses (returns timeout error).
64+
func NewCancelAwareSyncer(startedWG *sync.WaitGroup, timeout time.Duration) FuncSyncer {
6865
return FuncSyncer{fn: func(ctx context.Context) error {
69-
close(started)
66+
if startedWG != nil {
67+
startedWG.Done()
68+
}
7069
select {
7170
case <-ctx.Done():
72-
close(canceled)
7371
return ctx.Err()
7472
case <-time.After(timeout):
7573
return errors.New("syncer timed out waiting for cancellation")

plugin/evm/vmsync/registry.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import (
1313

1414
"github.com/ava-labs/coreth/plugin/evm/message"
1515

16-
synccommon "github.com/ava-labs/coreth/sync"
16+
syncpkg "github.com/ava-labs/coreth/sync"
1717
)
1818

1919
var errSyncerAlreadyRegistered = errors.New("syncer already registered")
2020

2121
// SyncerTask represents a single syncer with its name for identification.
2222
type SyncerTask struct {
2323
name string
24-
syncer synccommon.Syncer
24+
syncer syncpkg.Syncer
2525
}
2626

2727
// SyncerRegistry manages a collection of syncers for sequential execution.
@@ -39,7 +39,7 @@ func NewSyncerRegistry() *SyncerRegistry {
3939

4040
// Register adds a syncer to the registry.
4141
// Returns an error if a syncer with the same name is already registered.
42-
func (r *SyncerRegistry) Register(syncer synccommon.Syncer) error {
42+
func (r *SyncerRegistry) Register(syncer syncpkg.Syncer) error {
4343
id := syncer.ID()
4444
if r.registeredNames[id] {
4545
return fmt.Errorf("%w with id '%s'", errSyncerAlreadyRegistered, id)
@@ -51,22 +51,46 @@ func (r *SyncerRegistry) Register(syncer synccommon.Syncer) error {
5151
return nil
5252
}
5353

54-
// RunSyncerTasks executes all registered syncers.
55-
// The provided summary is used only for logging to decouple from concrete client types.
54+
// RunSyncerTasks executes all registered syncers synchronously.
5655
func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syncable) error {
56+
// Early return if context is already canceled (e.g., during shutdown).
57+
if err := ctx.Err(); err != nil {
58+
return err
59+
}
60+
61+
g := r.StartAsync(ctx, summary)
62+
63+
if err := g.Wait(); err != nil {
64+
return err
65+
}
66+
67+
log.Info("all syncers completed successfully", "count", len(r.syncers), "summary", summary.GetBlockHash().Hex())
68+
69+
return nil
70+
}
71+
72+
// StartAsync launches all registered syncers and returns an [errgroup.Group]
73+
// whose Wait() completes when all syncers exit. The context returned will be
74+
// cancelled when any syncer fails, propagating shutdown to the others.
75+
func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncable) *errgroup.Group {
76+
g, egCtx := errgroup.WithContext(ctx)
77+
5778
if len(r.syncers) == 0 {
58-
return nil
79+
return g
5980
}
6081

6182
summaryBlockHashHex := summary.GetBlockHash().Hex()
6283
blockHeight := summary.Height()
6384

64-
g, ctx := errgroup.WithContext(ctx)
65-
6685
for _, task := range r.syncers {
6786
g.Go(func() error {
6887
log.Info("starting syncer", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight)
69-
if err := task.syncer.Sync(ctx); err != nil {
88+
if err := task.syncer.Sync(egCtx); err != nil {
89+
// Context cancellation during shutdown is expected.
90+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
91+
log.Info("syncer cancelled", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight)
92+
return err
93+
}
7094
log.Error("failed syncing", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight, "err", err)
7195
return fmt.Errorf("%s failed: %w", task.name, err)
7296
}
@@ -76,11 +100,5 @@ func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syn
76100
})
77101
}
78102

79-
if err := g.Wait(); err != nil {
80-
return err
81-
}
82-
83-
log.Info("all syncers completed successfully", "count", len(r.syncers), "summary", summaryBlockHashHex)
84-
85-
return nil
103+
return g
86104
}

0 commit comments

Comments
 (0)