Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
a2b5450
feat(sync/vm): parallelize syncer execution with errgroup
powerslider Aug 19, 2025
2b8b9e3
Merge branch 'master' into powerslider/1130-concurrent-syncers
powerslider Aug 19, 2025
13f7c4a
test(sync/vm): simplify registry tests using utilstest
powerslider Aug 20, 2025
292f8b2
test(sync/vm): remove unnecessary test cleanups
powerslider Aug 20, 2025
47e71af
test: forgot to add FuncSyncer implementations
powerslider Aug 20, 2025
f6174a4
test: improve comments
powerslider Aug 21, 2025
9772d76
test: use require.FailNow instead of t.Fatal
powerslider Aug 21, 2025
f3e03b8
test: make the base test timeout be derived by t.Deadline
powerslider Aug 21, 2025
0d62f6d
Merge branch 'master' into powerslider/1130-concurrent-syncers
powerslider Aug 21, 2025
90d2911
fix: gci formatting
powerslider Aug 21, 2025
5c49827
fix: do not assume on first syncer failing
powerslider Aug 21, 2025
40b7313
fix: more remarks
powerslider Aug 22, 2025
a6497d2
Merge branch 'master' into powerslider/1130-concurrent-syncers
powerslider Aug 22, 2025
489cb79
feat(sync): decouple code syncer from state syncer and run concurrently
powerslider Aug 22, 2025
8645bc3
fix: split table test into separate tests to reduce complexity
powerslider Aug 25, 2025
5436497
Merge branch 'powerslider/1130-concurrent-syncers' into powerslider/1…
powerslider Aug 25, 2025
0451ace
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Aug 27, 2025
d4be816
chore: refactor and rename code syncer interfaces
powerslider Aug 27, 2025
e569f56
fix: more renaming
powerslider Aug 27, 2025
f86373b
fix: goimports
powerslider Aug 27, 2025
bab7f02
fix: gci format
powerslider Aug 27, 2025
49f3c53
chore(sync): add Ready() to CodeFetcher and gate state sync startup
powerslider Aug 27, 2025
c422987
fix: broken test and lint issue
powerslider Aug 27, 2025
19fe953
chore: rename FinishCodeCollection to Finalize
powerslider Aug 27, 2025
b3c04be
docs: document CodeFetcher
powerslider Aug 27, 2025
9f2f658
fix: ignore require_no_error_inline_func due to messing with the cont…
powerslider Aug 27, 2025
ed714de
fix: workaround for require_no_error_inline_func due to messing with …
powerslider Aug 27, 2025
78f536e
fix: modify test_require_no_error_inline_func to flag only single-LHS…
powerslider Aug 27, 2025
b7d98a3
fix: remove atomic name in client.go
powerslider Aug 28, 2025
0ec9d1d
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Aug 28, 2025
f20fe23
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Aug 29, 2025
01aeeee
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 1, 2025
14fe808
feat(sync): add Name() and ID() to Syncer interface to make syncer im…
powerslider Sep 2, 2025
5063873
feat(statesync): introduce CodeFetcherQueue and decouple CodeSyncer c…
powerslider Sep 3, 2025
c7b86db
fix: move getCodeToFetchFromDB to code fetcher
powerslider Sep 3, 2025
e8fee23
fix: add CodeFetcherOption type
powerslider Sep 3, 2025
3a447b6
fix: some formatting
powerslider Sep 3, 2025
bd5be21
fix: PR remarks on tests
powerslider Sep 3, 2025
af55380
fix: more PR remarks on tests
powerslider Sep 4, 2025
1f971aa
fix: add ability to track in-flight AddCode calls to coordinate clean…
powerslider Sep 4, 2025
401b564
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 5, 2025
5be557a
refactor(sync/statesync): harden code fetcher lifecycle, improve shut…
powerslider Sep 5, 2025
ee8b54f
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 9, 2025
51873a1
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 11, 2025
2097dbc
fix: apply recommended optimizations
powerslider Sep 12, 2025
4c64aa5
feat(sync/statesync): adopt single forwarder pattern for CodeQueue
powerslider Sep 12, 2025
8492d1e
fix: channel close races
powerslider Sep 15, 2025
af18251
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 15, 2025
63b8dbd
fix: linter error and simplify in tests
powerslider Sep 15, 2025
3151ef3
fix: some naming changes
powerslider Sep 15, 2025
be20084
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 16, 2025
aed34f1
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 17, 2025
0b35e75
refactor(plugin/evm): move syncer client/registry to plugin/evm/vmsyn…
powerslider Sep 17, 2025
5487e18
Remove libevm `cmd/*` packages. (#1224)
JonathanOppenheimer Sep 17, 2025
ed6bb70
Remove libevm `signer` package (#1225)
JonathanOppenheimer Sep 17, 2025
35e6b12
Remove duplicate cancun checks (#1223)
alarso16 Sep 17, 2025
446f283
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 18, 2025
f8a7e4c
fix: some naming changes
powerslider Sep 23, 2025
adadd3c
refactor: simplify `CodeQueue` fan-in (#1254)
powerslider Sep 23, 2025
e5d0c7d
refactor(statesync): simplify CodeQueue, remove auto-init, rename cap…
powerslider Sep 23, 2025
76e1ce0
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 23, 2025
41f199e
refactor(statesync): move code deduplication from queue (producer) to…
powerslider Sep 23, 2025
8f779fd
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 23, 2025
dc046e8
fix: some naming changes
powerslider Sep 23, 2025
3e9f945
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 24, 2025
416a906
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 24, 2025
54c2592
fix(statesync): prevent data race by serializing out close with enqueues
powerslider Sep 24, 2025
6b1a6e3
fix(statesync): eliminate send/close race in code queue
powerslider Sep 24, 2025
3b596bb
fix: remove unused method
powerslider Sep 24, 2025
ebfbed4
Merge branch 'master' into powerslider/1139-code-syncer-decoupling
powerslider Sep 24, 2025
aea5f06
fix(statesync): serialize code ownership and release after durable co…
powerslider Sep 25, 2025
c895e3d
fix: handle error for `Finalize` call
powerslider Sep 25, 2025
a40996b
fix(statesync): make Finalize idempotent via CAS; error only on quit
powerslider Sep 25, 2025
f535b36
refactor(statesync): remove `CodeRequestQueue` interface and use `Cod…
powerslider Sep 25, 2025
1aad3e7
refactor(statesync): simplify CodeQueue by removing AddCode state che…
powerslider Sep 25, 2025
13d652e
fix(statesync): add fix to align with `AddCode` contract
powerslider Sep 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions sync/statesync/code_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ type codeSyncer struct {
done <-chan struct{}
}

// newCodeSyncer returns a code syncer that will sync code bytes from the network in a separate thread.
func newCodeSyncer(client statesyncclient.Client, db ethdb.Database, config Config) (*codeSyncer, error) {
// NewCodeSyncer allows external packages (e.g., registry wiring) to create a code syncer as a separate task and
// inject it as a [CodeHashSink] into the state syncer.
func NewCodeSyncer(client statesyncclient.Client, db ethdb.Database, config Config) (*codeSyncer, error) {
cfg := config.WithUnsetDefaults()

dbCodeHashes, err := getCodeToFetchFromDB(db)
Expand Down Expand Up @@ -217,11 +218,11 @@ func (c *codeSyncer) addCode(codeHashes []common.Hash) error {
return c.addHashesToQueue(selectedCodeHashes)
}

// notifyAccountTrieCompleted notifies the code syncer that there will be no more incoming
// code hashes from syncing the account trie, so it only needs to compelete its outstanding
// work.
// AccountTrieCompleted implements CodeHashSink by signaling no further code hashes will be added.
// Notifies the code syncer that there will be no more incoming code hashes from syncing the account trie,
// so it only needs to complete its outstanding work.
// Note: this allows the worker threads to exit and return a nil error.
func (c *codeSyncer) notifyAccountTrieCompleted() {
func (c *codeSyncer) AccountTrieCompleted() {
<-c.open // The code syncer must queue the previous code from the db first
close(c.codeHashes)
}
Expand Down
7 changes: 4 additions & 3 deletions sync/statesync/code_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ func testCodeSyncer(t *testing.T, test codeSyncerTest) {
clientDB = rawdb.NewMemoryDatabase()
}

codeSyncer, err := newCodeSyncer(
cfg := NewDefaultConfig(testRequestSize)
codeSyncer, err := NewCodeSyncer(
mockClient,
clientDB,
NewDefaultConfig(testRequestSize),
cfg,
)
require.NoError(t, err)
if test.setupCodeSyncer != nil {
Expand All @@ -69,7 +70,7 @@ func testCodeSyncer(t *testing.T, test codeSyncerTest) {
require.ErrorIs(t, err, test.err)
}
}
codeSyncer.notifyAccountTrieCompleted()
codeSyncer.AccountTrieCompleted()
}()

err = codeSyncer.Sync(context.Background())
Expand Down
42 changes: 28 additions & 14 deletions sync/statesync/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package statesync

import (
"context"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -68,6 +69,21 @@ func (c Config) WithUnsetDefaults() Config {
return out
}

// CodeHashSink is a minimal interface for accepting discovered code hashes
// and signaling when no more code hashes will be produced from the account trie.
// Implemented by codeSyncer to support decoupled wiring.
type CodeHashSink interface {
AddCode(codeHashes []common.Hash) error
AccountTrieCompleted()
}

// CodeSyncerTask is implemented by the concrete code syncer and combines
// the sink and runnable task behaviors so callers can use a single type.
type CodeSyncerTask interface {
CodeHashSink
synccommon.Syncer
}

// stateSync keeps the state of the entire state sync operation.
type stateSync struct {
db ethdb.Database // database we are syncing
Expand All @@ -76,10 +92,10 @@ type stateSync struct {
snapshot snapshot.SnapshotIterable // used to access the database we are syncing as a snapshot.
batchSize uint // write batches when they reach this size

segments chan syncclient.LeafSyncTask // channel of tasks to sync
syncer *syncclient.CallbackLeafSyncer // performs the sync, looping over each task's range and invoking specified callbacks
codeSyncer *codeSyncer // manages the asynchronous download and batching of code hashes
trieQueue *trieQueue // manages a persistent list of storage tries we need to sync and any segments that are created for them
segments chan syncclient.LeafSyncTask // channel of tasks to sync
syncer *syncclient.CallbackLeafSyncer // performs the sync, looping over each task's range and invoking specified callbacks
codeSink CodeHashSink // sink that manages the asynchronous download and batching of code hashes
trieQueue *trieQueue // manages a persistent list of storage tries we need to sync and any segments that are created for them

// track the main account trie specifically to commit its root at the end of the operation
mainTrie *trieToSync
Expand All @@ -95,7 +111,7 @@ type stateSync struct {
stats *trieSyncStats
}

func NewSyncer(client syncclient.Client, db ethdb.Database, root common.Hash, config Config) (synccommon.Syncer, error) {
func NewSyncer(client syncclient.Client, db ethdb.Database, root common.Hash, sink CodeHashSink, config Config) (synccommon.Syncer, error) {
cfg := config.WithUnsetDefaults()

ss := &stateSync{
Expand Down Expand Up @@ -125,12 +141,12 @@ func NewSyncer(client syncclient.Client, db ethdb.Database, root common.Hash, co
OnFailure: ss.onSyncFailure,
})

var err error
ss.codeSyncer, err = newCodeSyncer(client, db, cfg)
if err != nil {
return nil, err
if sink == nil {
return nil, errors.New("CodeHashSink must be provided")
}
ss.codeSink = sink

var err error
ss.trieQueue = NewTrieQueue(db)
if err := ss.trieQueue.clearIfRootDoesNotMatch(ss.root); err != nil {
return nil, err
Expand Down Expand Up @@ -171,7 +187,7 @@ func (t *stateSync) onStorageTrieFinished(root common.Hash) error {

// onMainTrieFinished is called after the main trie finishes syncing.
func (t *stateSync) onMainTrieFinished() error {
t.codeSyncer.notifyAccountTrieCompleted()
t.codeSink.AccountTrieCompleted()

// count the number of storage tries we need to sync for eta purposes.
numStorageTries, err := t.trieQueue.countTries()
Expand Down Expand Up @@ -255,7 +271,7 @@ func (t *stateSync) storageTrieProducer(ctx context.Context) error {
}

func (t *stateSync) Sync(ctx context.Context) error {
// Start the code syncer and leaf syncer.
// Start the leaf syncer and storage trie producer.
eg, egCtx := errgroup.WithContext(ctx)

eg.Go(func() error {
Expand All @@ -264,9 +280,7 @@ func (t *stateSync) Sync(ctx context.Context) error {
}
return t.onSyncComplete()
})
eg.Go(func() error {
return t.codeSyncer.Sync(egCtx)
})
// Note: code syncer is no longer started here. It should be run by the registry if registered separately.
eg.Go(func() error {
return t.storageTrieProducer(egCtx)
})
Expand Down
15 changes: 12 additions & 3 deletions sync/statesync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ava-labs/libevm/trie"
"github.com/ava-labs/libevm/triedb"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/ava-labs/coreth/core/state/snapshot"
"github.com/ava-labs/coreth/plugin/evm/customrawdb"
Expand Down Expand Up @@ -57,13 +58,21 @@ func testSync(t *testing.T, test syncTest) {
mockClient.GetLeafsIntercept = test.GetLeafsIntercept
mockClient.GetCodeIntercept = test.GetCodeIntercept

s, err := NewSyncer(mockClient, clientDB, root, Config{
cfg := Config{
BatchSize: 1000, // Use a lower batch size in order to get test coverage of batches being written early.
RequestSize: testRequestSize,
})
}
codeSyncer, err := NewCodeSyncer(mockClient, clientDB, cfg)
require.NoError(t, err, "failed to create code syncer")
stateSyncer, err := NewSyncer(mockClient, clientDB, root, codeSyncer, cfg)
require.NoError(t, err, "failed to create state syncer")

err = s.Sync(ctx)
// Run both syncers concurrently and wait for the first error.
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error { return codeSyncer.Sync(egCtx) })
eg.Go(func() error { return stateSyncer.Sync(egCtx) })

err = eg.Wait()
require.ErrorIs(t, err, test.expectedError, "unexpected error during sync")

// Only assert database consistency if the sync was expected to succeed.
Expand Down
4 changes: 2 additions & 2 deletions sync/statesync/trie_sync_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func (m *mainTrieTask) OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) err
codeHashes = append(codeHashes, codeHash)
}
}
// Add collected code hashes to the code syncer.
return m.sync.codeSyncer.AddCode(codeHashes)
// Add collected code hashes to the sink.
return m.sync.codeSink.AddCode(codeHashes)
}

type storageTrieTask struct {
Expand Down
73 changes: 73 additions & 0 deletions sync/synctest/synctest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

// Package utilstest provides small testing helpers used across the codebase.
// It includes lightweight syncer adapters and utilities for coordinating
// goroutines under test.
package synctest

import (
"context"
"errors"
"sync"
"time"
)

// FuncSyncer adapts a function to the simple Syncer shape used in tests. It is
// useful for defining small, behavior-driven syncers inline.
type FuncSyncer struct {
fn func(ctx context.Context) error
}

// Sync calls the wrapped function and returns its result.
func (f FuncSyncer) Sync(ctx context.Context) error { return f.fn(ctx) }

// NewBarrierSyncer returns a syncer that, upon entering Sync, calls wg.Done() to
// signal it has started, then blocks until either:
// - `releaseCh` is closed, returning nil; or
// - `ctx` is canceled, returning ctx.Err.
//
// This acts as a barrier to coordinate test goroutines.
func NewBarrierSyncer(wg *sync.WaitGroup, releaseCh <-chan struct{}) FuncSyncer {
return FuncSyncer{fn: func(ctx context.Context) error {
wg.Done()
select {
case <-releaseCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}}
}

// NewErrorSyncer returns a syncer that waits until either `trigger` is closed
// (then returns `errToReturn`) or `ctx` is canceled (then returns ctx.Err).
func NewErrorSyncer(trigger <-chan struct{}, errToReturn error) FuncSyncer {
return FuncSyncer{fn: func(ctx context.Context) error {
select {
case <-trigger:
return errToReturn
case <-ctx.Done():
return ctx.Err()
}
}}
}

// NewCancelAwareSyncer closes `started` as soon as Sync begins, then waits for
// either:
// - `ctx` cancellation: closes `canceled` and returns ctx.Err; or
// - `timeout` elapsing: returns an error indicating a timeout.
//
// Useful for asserting that cancellation propagates to the syncer under test.
func NewCancelAwareSyncer(started, canceled chan struct{}, timeout time.Duration) FuncSyncer {
return FuncSyncer{fn: func(ctx context.Context) error {
close(started)
select {
case <-ctx.Done():
close(canceled)
return ctx.Err()
case <-time.After(timeout):
return errors.New("syncer timed out waiting for cancellation")
}
}}
}
46 changes: 31 additions & 15 deletions sync/vm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
atomicStateSyncOperationName = "Atomic State Sync"
blockSyncOperationName = "Block Sync"
evmStateSyncOperationName = "EVM State Sync"
codeHashSyncOperationName = "Code Hash Sync"
)

var stateSyncSummaryKey = []byte("stateSyncSummary")
Expand Down Expand Up @@ -168,34 +169,45 @@ func (client *client) stateSync(ctx context.Context) error {
}

func (client *client) registerSyncers(ctx context.Context, registry *SyncerRegistry) error {
// Register block syncer.
syncer, err := client.createBlockSyncer(client.summary.GetBlockHash(), client.summary.Height())
blockSyncer, err := client.createBlockSyncer(client.summary.GetBlockHash(), client.summary.Height())
if err != nil {
return fmt.Errorf("failed to create block syncer: %w", err)
}
if err := registry.Register(blockSyncOperationName, syncer); err != nil {
return fmt.Errorf("failed to register block syncer: %w", err)
}

// Register EVM state syncer.
evmSyncer, err := client.createEVMSyncer()
cfg := statesync.NewDefaultConfig(client.RequestSize)

codeSyncer, err := client.createCodeHashSyncer(cfg)
if err != nil {
return fmt.Errorf("failed to create EVM syncer: %w", err)
return fmt.Errorf("failed to create code syncer: %w", err)
}

if err := registry.Register(evmStateSyncOperationName, evmSyncer); err != nil {
return fmt.Errorf("failed to register EVM syncer: %w", err)
stateSyncer, err := client.createEVMSyncer(cfg, codeSyncer)
if err != nil {
return fmt.Errorf("failed to create EVM state syncer: %w", err)
}

// Register atomic syncer.
var atomicSyncer synccommon.Syncer
if client.Extender != nil {
atomicSyncer, err := client.createAtomicSyncer(ctx)
atomicSyncer, err = client.createAtomicSyncer(ctx)
if err != nil {
return fmt.Errorf("failed to create atomic syncer: %w", err)
}
}

// Register in a deterministic order
if err := registry.Register(blockSyncOperationName, blockSyncer); err != nil {
return fmt.Errorf("failed to register %s: %w", blockSyncOperationName, err)
}
if err := registry.Register(codeHashSyncOperationName, codeSyncer); err != nil {
return fmt.Errorf("failed to register %s: %w", codeHashSyncOperationName, err)
}
if err := registry.Register(evmStateSyncOperationName, stateSyncer); err != nil {
return fmt.Errorf("failed to register %s: %w", evmStateSyncOperationName, err)
}

if atomicSyncer != nil {
if err := registry.Register(atomicStateSyncOperationName, atomicSyncer); err != nil {
return fmt.Errorf("failed to register atomic syncer: %w", err)
return fmt.Errorf("failed to register %s: %w", atomicStateSyncOperationName, err)
}
}

Expand All @@ -210,8 +222,12 @@ func (client *client) createBlockSyncer(fromHash common.Hash, fromHeight uint64)
})
}

func (client *client) createEVMSyncer() (synccommon.Syncer, error) {
return statesync.NewSyncer(client.Client, client.ChainDB, client.summary.GetBlockRoot(), statesync.NewDefaultConfig(client.RequestSize))
func (client *client) createEVMSyncer(cfg statesync.Config, sink statesync.CodeHashSink) (synccommon.Syncer, error) {
return statesync.NewSyncer(client.Client, client.ChainDB, client.summary.GetBlockRoot(), sink, cfg)
}

func (client *client) createCodeHashSyncer(cfg statesync.Config) (statesync.CodeSyncerTask, error) {
return statesync.NewCodeSyncer(client.Client, client.ChainDB, cfg)
}

func (client *client) createAtomicSyncer(ctx context.Context) (synccommon.Syncer, error) {
Expand Down
24 changes: 18 additions & 6 deletions sync/vm/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/ava-labs/libevm/log"
"golang.org/x/sync/errgroup"

synccommon "github.com/ava-labs/coreth/sync"
)
Expand Down Expand Up @@ -50,15 +51,26 @@ func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, client *client) err
return nil
}

g, ctx := errgroup.WithContext(ctx)

for _, task := range r.syncers {
log.Info("starting syncer", "name", task.name, "summary", client.summary)
g.Go(func() error {
log.Info("starting syncer", "name", task.name, "summary", client.summary)
if err := task.syncer.Sync(ctx); err != nil {
log.Error("failed syncing", "name", task.name, "summary", client.summary, "err", err)
return fmt.Errorf("%s failed: %w", task.name, err)
}
log.Info("completed successfully", "name", task.name, "summary", client.summary)

return nil
})
}

if err := task.syncer.Sync(ctx); err != nil {
log.Error("failed to complete", "name", task.name, "summary", client.summary, "err", err)
return fmt.Errorf("%s failed: %w", task.name, err)
}
log.Info("completed successfully", "name", task.name, "summary", client.summary)
if err := g.Wait(); err != nil {
return err
}

log.Info("all syncers completed successfully", "count", len(r.syncers), "summary", client.summary)

return nil
}
Loading
Loading