From 401b1f95fc4e32dcf6d0d0fbb317e161e55f1470 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 6 Oct 2025 17:02:10 +0200 Subject: [PATCH 01/16] fix(block/syncing): broadcast on sync nodes --- block/components.go | 19 +++---- block/internal/common/expected_interfaces.go | 13 +++++ block/internal/executing/executor.go | 13 ++--- block/internal/executing/executor_test.go | 7 ++- block/internal/syncing/syncer.go | 56 +++++++++++-------- block/internal/syncing/syncer_backoff_test.go | 30 +++++++--- .../internal/syncing/syncer_benchmark_test.go | 8 +-- block/internal/syncing/syncer_test.go | 20 +++---- node/full.go | 4 +- pkg/sync/sync_service.go | 2 +- 10 files changed, 102 insertions(+), 70 deletions(-) create mode 100644 block/internal/common/expected_interfaces.go diff --git a/block/components.go b/block/components.go index a157817703..115fb4aad7 100644 --- a/block/components.go +++ b/block/components.go @@ -5,10 +5,10 @@ import ( "errors" "fmt" - goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/executing" "github.com/evstack/ev-node/block/internal/reaping" "github.com/evstack/ev-node/block/internal/submitting" @@ -122,11 +122,6 @@ func (bc *Components) Stop() error { return errs } -// broadcaster interface for P2P broadcasting -type broadcaster[T any] interface { - WriteToStoreAndBroadcast(ctx context.Context, payload T) error -} - // NewSyncComponents creates components for a non-aggregator full node that can only sync blocks. // Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA. // They have more sync capabilities than light nodes but no block production. No signer required. @@ -136,8 +131,8 @@ func NewSyncComponents( store store.Store, exec coreexecutor.Executor, da coreda.DA, - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, @@ -158,8 +153,8 @@ func NewSyncComponents( metrics, config, genesis, - headerStore, - dataStore, + headerBroadcaster, + dataBroadcaster, logger, blockOpts, errorCh, @@ -199,8 +194,8 @@ func NewAggregatorComponents( sequencer coresequencer.Sequencer, da coreda.DA, signer signer.Signer, - headerBroadcaster broadcaster[*types.SignedHeader], - dataBroadcaster broadcaster[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, diff --git a/block/internal/common/expected_interfaces.go b/block/internal/common/expected_interfaces.go new file mode 100644 index 0000000000..c167e5da43 --- /dev/null +++ b/block/internal/common/expected_interfaces.go @@ -0,0 +1,13 @@ +package common + +import ( + "context" + + goheader "github.com/celestiaorg/go-header" +) + +// Broadcaster interface for handling P2P stores and broadcasting +type Broadcaster[H goheader.Header[H]] interface { + WriteToStoreAndBroadcast(ctx context.Context, payload H) error + Store() goheader.Store[H] +} diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 2d93a7c464..a8be5a4f62 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -23,11 +23,6 @@ import ( "github.com/evstack/ev-node/types" ) -// broadcaster interface for P2P broadcasting -type broadcaster[T any] interface { - WriteToStoreAndBroadcast(ctx context.Context, payload T) error -} - // Executor handles block production, transaction processing, and state management type Executor struct { // Core components @@ -41,8 +36,8 @@ type Executor struct { metrics *common.Metrics // Broadcasting - headerBroadcaster broadcaster[*types.SignedHeader] - dataBroadcaster broadcaster[*types.Data] + headerBroadcaster common.Broadcaster[*types.SignedHeader] + dataBroadcaster common.Broadcaster[*types.Data] // Configuration config config.Config @@ -81,8 +76,8 @@ func NewExecutor( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerBroadcaster broadcaster[*types.SignedHeader], - dataBroadcaster broadcaster[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, diff --git a/block/internal/executing/executor_test.go b/block/internal/executing/executor_test.go index b5f9e2f47a..d6251015a6 100644 --- a/block/internal/executing/executor_test.go +++ b/block/internal/executing/executor_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + goheader "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" @@ -20,7 +21,7 @@ import ( ) // mockBroadcaster for testing -type mockBroadcaster[T any] struct { +type mockBroadcaster[T goheader.Header[T]] struct { called bool payload T } @@ -31,6 +32,10 @@ func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, paylo return nil } +func (m *mockBroadcaster[T]) Store() goheader.Store[T] { + panic("should not need to be needed") +} + func TestExecutor_BroadcasterIntegration(t *testing.T) { // Create in-memory store ds := sync.MutexWrap(datastore.NewMapDatastore()) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 440a08a9c2..11beb96ed8 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -9,8 +9,8 @@ import ( "sync/atomic" "time" - goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" @@ -25,6 +25,7 @@ import ( type daRetriever interface { RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) } + type p2pHandler interface { ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent @@ -53,9 +54,9 @@ type Syncer struct { // DA state daHeight uint64 - // P2P stores - headerStore goheader.Store[*types.SignedHeader] - dataStore goheader.Store[*types.Data] + // P2P handling + headerBroadcaster common.Broadcaster[*types.SignedHeader] + dataBroadcaster common.Broadcaster[*types.Data] // Channels for coordination heightInCh chan common.DAHeightEvent @@ -83,27 +84,27 @@ func NewSyncer( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, ) *Syncer { return &Syncer{ - store: store, - exec: exec, - da: da, - cache: cache, - metrics: metrics, - config: config, - genesis: genesis, - options: options, - headerStore: headerStore, - dataStore: dataStore, - lastStateMtx: &sync.RWMutex{}, - heightInCh: make(chan common.DAHeightEvent, 10_000), - errorCh: errorCh, - logger: logger.With().Str("component", "syncer").Logger(), + store: store, + exec: exec, + da: da, + cache: cache, + metrics: metrics, + config: config, + genesis: genesis, + options: options, + headerBroadcaster: headerBroadcaster, + dataBroadcaster: dataBroadcaster, + lastStateMtx: &sync.RWMutex{}, + heightInCh: make(chan common.DAHeightEvent, 10_000), + errorCh: errorCh, + logger: logger.With().Str("component", "syncer").Logger(), } } @@ -118,7 +119,7 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger) - s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.genesis, s.options, s.logger) + s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.genesis, s.options, s.logger) // Start main processing loop s.wg.Add(1) @@ -327,7 +328,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block select { case <-blockTicker: // Process headers - newHeaderHeight := s.headerStore.Height() + newHeaderHeight := s.headerBroadcaster.Store().Height() if newHeaderHeight > *lastHeaderHeight { events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) for _, event := range events { @@ -344,7 +345,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block } // Process data - newDataHeight := s.dataStore.Height() + newDataHeight := s.headerBroadcaster.Store().Height() if newDataHeight == newHeaderHeight { *lastDataHeight = newDataHeight } else if newDataHeight > *lastDataHeight { @@ -407,6 +408,15 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { } return } + + // broadcast header and data to P2P network + g, ctx := errgroup.WithContext(s.ctx) + g.Go(func() error { return s.headerBroadcaster.WriteToStoreAndBroadcast(ctx, event.Header) }) + g.Go(func() error { return s.dataBroadcaster.WriteToStoreAndBroadcast(ctx, event.Data) }) + if err := g.Wait(); err != nil { + s.logger.Error().Err(err).Msg("failed to broadcast header and/data") + // don't fail block production on broadcast error + } } // errInvalidBlock is returned when a block is failing validation diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 2dc2bd804f..ecaad863b0 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + goheader "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" @@ -24,6 +25,19 @@ import ( "github.com/evstack/ev-node/types" ) +// mockBroadcaster for testing +type mockBroadcaster[T goheader.Header[T]] struct { + store goheader.Store[T] +} + +func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error { + return nil +} + +func (m *mockBroadcaster[T]) Store() goheader.Store[T] { + return m.store +} + // TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff // behavior when encountering different types of DA layer errors. func TestSyncer_BackoffOnDAError(t *testing.T) { @@ -76,11 +90,11 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerStore = headerStore + syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataStore = dataStore + syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} var callTimes []time.Time callCount := 0 @@ -167,11 +181,11 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerStore = headerStore + syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataStore = dataStore + syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} var callTimes []time.Time @@ -253,11 +267,11 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerStore = headerStore + syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataStore = dataStore + syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} var callTimes []time.Time @@ -335,8 +349,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 28cf2af22d..355187ceea 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -108,8 +108,8 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay common.NopMetrics(), cfg, gen, - nil, // headerStore not used; we inject P2P directly to channel when needed - nil, // dataStore not used + nil, // we inject P2P directly to channel when needed + nil, // injected when needed zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -152,9 +152,9 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay s.p2pHandler = newMockp2pHandler(b) // not used directly in this benchmark path headerP2PStore := mocks.NewMockStore[*types.SignedHeader](b) headerP2PStore.On("Height").Return(uint64(0)).Maybe() - s.headerStore = headerP2PStore + s.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerP2PStore} dataP2PStore := mocks.NewMockStore[*types.Data](b) dataP2PStore.On("Height").Return(uint64(0)).Maybe() - s.dataStore = dataP2PStore + s.dataBroadcaster = &mockBroadcaster[*types.Data]{dataP2PStore} return &benchFixture{s: s, st: st, cm: cm, cancel: cancel} } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 0fe7d5192e..067ca04319 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -107,8 +107,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -155,8 +155,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -205,8 +205,8 @@ func TestSequentialBlockSync(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -332,8 +332,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -409,8 +409,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), diff --git a/node/full.go b/node/full.go index c1704355dc..947c164785 100644 --- a/node/full.go +++ b/node/full.go @@ -120,8 +120,8 @@ func newFullNode( rktStore, exec, da, - headerSyncService.Store(), - dataSyncService.Store(), + headerSyncService, + dataSyncService, logger, blockMetrics, nodeOpts.BlockOptions, diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 6af6e40ab8..f7df81db97 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -117,7 +117,7 @@ func newSyncService[H header.Header[H]]( } // Store returns the store of the SyncService -func (syncService *SyncService[H]) Store() *goheaderstore.Store[H] { +func (syncService *SyncService[H]) Store() header.Store[H] { return syncService.store } From a778aaccb0c7e479a82b98530fbc33b7c9f04071 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 6 Oct 2025 17:04:42 +0200 Subject: [PATCH 02/16] wording --- block/internal/executing/executor.go | 2 +- block/internal/executing/executor_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index a8be5a4f62..ec859a1ae4 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -35,7 +35,7 @@ type Executor struct { cache cache.Manager metrics *common.Metrics - // Broadcasting + // P2P handling headerBroadcaster common.Broadcaster[*types.SignedHeader] dataBroadcaster common.Broadcaster[*types.Data] diff --git a/block/internal/executing/executor_test.go b/block/internal/executing/executor_test.go index d6251015a6..bf8f752a5d 100644 --- a/block/internal/executing/executor_test.go +++ b/block/internal/executing/executor_test.go @@ -33,7 +33,7 @@ func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, paylo } func (m *mockBroadcaster[T]) Store() goheader.Store[T] { - panic("should not need to be needed") + panic("should not be needed") } func TestExecutor_BroadcasterIntegration(t *testing.T) { From 18ec45d15fb5c487626f6f311e135e40cb2dafb2 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 6 Oct 2025 17:05:30 +0200 Subject: [PATCH 03/16] typo --- block/internal/syncing/syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 11beb96ed8..49b24f4a1b 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -345,7 +345,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block } // Process data - newDataHeight := s.headerBroadcaster.Store().Height() + newDataHeight := s.dataBroadcaster.Store().Height() if newDataHeight == newHeaderHeight { *lastDataHeight = newDataHeight } else if newDataHeight > *lastDataHeight { From aaecf394da91a22f576068c1943fe1a09ff8bc84 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 6 Oct 2025 17:25:48 +0200 Subject: [PATCH 04/16] simplify!! --- block/components.go | 19 +++--- block/internal/common/expected_interfaces.go | 13 ---- block/internal/executing/executor.go | 15 +++-- block/internal/executing/executor_test.go | 7 +-- block/internal/syncing/syncer.go | 63 ++++++++++--------- block/internal/syncing/syncer_backoff_test.go | 30 +++------ .../internal/syncing/syncer_benchmark_test.go | 8 +-- block/internal/syncing/syncer_test.go | 20 +++--- node/full.go | 4 +- 9 files changed, 79 insertions(+), 100 deletions(-) delete mode 100644 block/internal/common/expected_interfaces.go diff --git a/block/components.go b/block/components.go index 115fb4aad7..a157817703 100644 --- a/block/components.go +++ b/block/components.go @@ -5,10 +5,10 @@ import ( "errors" "fmt" + goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/cache" - "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/executing" "github.com/evstack/ev-node/block/internal/reaping" "github.com/evstack/ev-node/block/internal/submitting" @@ -122,6 +122,11 @@ func (bc *Components) Stop() error { return errs } +// broadcaster interface for P2P broadcasting +type broadcaster[T any] interface { + WriteToStoreAndBroadcast(ctx context.Context, payload T) error +} + // NewSyncComponents creates components for a non-aggregator full node that can only sync blocks. // Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA. // They have more sync capabilities than light nodes but no block production. No signer required. @@ -131,8 +136,8 @@ func NewSyncComponents( store store.Store, exec coreexecutor.Executor, da coreda.DA, - headerBroadcaster common.Broadcaster[*types.SignedHeader], - dataBroadcaster common.Broadcaster[*types.Data], + headerStore goheader.Store[*types.SignedHeader], + dataStore goheader.Store[*types.Data], logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, @@ -153,8 +158,8 @@ func NewSyncComponents( metrics, config, genesis, - headerBroadcaster, - dataBroadcaster, + headerStore, + dataStore, logger, blockOpts, errorCh, @@ -194,8 +199,8 @@ func NewAggregatorComponents( sequencer coresequencer.Sequencer, da coreda.DA, signer signer.Signer, - headerBroadcaster common.Broadcaster[*types.SignedHeader], - dataBroadcaster common.Broadcaster[*types.Data], + headerBroadcaster broadcaster[*types.SignedHeader], + dataBroadcaster broadcaster[*types.Data], logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, diff --git a/block/internal/common/expected_interfaces.go b/block/internal/common/expected_interfaces.go deleted file mode 100644 index c167e5da43..0000000000 --- a/block/internal/common/expected_interfaces.go +++ /dev/null @@ -1,13 +0,0 @@ -package common - -import ( - "context" - - goheader "github.com/celestiaorg/go-header" -) - -// Broadcaster interface for handling P2P stores and broadcasting -type Broadcaster[H goheader.Header[H]] interface { - WriteToStoreAndBroadcast(ctx context.Context, payload H) error - Store() goheader.Store[H] -} diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index ec859a1ae4..2d93a7c464 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -23,6 +23,11 @@ import ( "github.com/evstack/ev-node/types" ) +// broadcaster interface for P2P broadcasting +type broadcaster[T any] interface { + WriteToStoreAndBroadcast(ctx context.Context, payload T) error +} + // Executor handles block production, transaction processing, and state management type Executor struct { // Core components @@ -35,9 +40,9 @@ type Executor struct { cache cache.Manager metrics *common.Metrics - // P2P handling - headerBroadcaster common.Broadcaster[*types.SignedHeader] - dataBroadcaster common.Broadcaster[*types.Data] + // Broadcasting + headerBroadcaster broadcaster[*types.SignedHeader] + dataBroadcaster broadcaster[*types.Data] // Configuration config config.Config @@ -76,8 +81,8 @@ func NewExecutor( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerBroadcaster common.Broadcaster[*types.SignedHeader], - dataBroadcaster common.Broadcaster[*types.Data], + headerBroadcaster broadcaster[*types.SignedHeader], + dataBroadcaster broadcaster[*types.Data], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, diff --git a/block/internal/executing/executor_test.go b/block/internal/executing/executor_test.go index bf8f752a5d..b5f9e2f47a 100644 --- a/block/internal/executing/executor_test.go +++ b/block/internal/executing/executor_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - goheader "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" @@ -21,7 +20,7 @@ import ( ) // mockBroadcaster for testing -type mockBroadcaster[T goheader.Header[T]] struct { +type mockBroadcaster[T any] struct { called bool payload T } @@ -32,10 +31,6 @@ func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, paylo return nil } -func (m *mockBroadcaster[T]) Store() goheader.Store[T] { - panic("should not be needed") -} - func TestExecutor_BroadcasterIntegration(t *testing.T) { // Create in-memory store ds := sync.MutexWrap(datastore.NewMapDatastore()) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 49b24f4a1b..d39700fa15 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -9,8 +9,8 @@ import ( "sync/atomic" "time" + goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" - "golang.org/x/sync/errgroup" "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" @@ -25,7 +25,6 @@ import ( type daRetriever interface { RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) } - type p2pHandler interface { ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent @@ -54,9 +53,9 @@ type Syncer struct { // DA state daHeight uint64 - // P2P handling - headerBroadcaster common.Broadcaster[*types.SignedHeader] - dataBroadcaster common.Broadcaster[*types.Data] + // P2P stores + headerStore goheader.Store[*types.SignedHeader] + dataStore goheader.Store[*types.Data] // Channels for coordination heightInCh chan common.DAHeightEvent @@ -84,27 +83,27 @@ func NewSyncer( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerBroadcaster common.Broadcaster[*types.SignedHeader], - dataBroadcaster common.Broadcaster[*types.Data], + headerStore goheader.Store[*types.SignedHeader], + dataStore goheader.Store[*types.Data], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, ) *Syncer { return &Syncer{ - store: store, - exec: exec, - da: da, - cache: cache, - metrics: metrics, - config: config, - genesis: genesis, - options: options, - headerBroadcaster: headerBroadcaster, - dataBroadcaster: dataBroadcaster, - lastStateMtx: &sync.RWMutex{}, - heightInCh: make(chan common.DAHeightEvent, 10_000), - errorCh: errorCh, - logger: logger.With().Str("component", "syncer").Logger(), + store: store, + exec: exec, + da: da, + cache: cache, + metrics: metrics, + config: config, + genesis: genesis, + options: options, + headerStore: headerStore, + dataStore: dataStore, + lastStateMtx: &sync.RWMutex{}, + heightInCh: make(chan common.DAHeightEvent, 10_000), + errorCh: errorCh, + logger: logger.With().Str("component", "syncer").Logger(), } } @@ -119,7 +118,7 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger) - s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.genesis, s.options, s.logger) + s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.genesis, s.options, s.logger) // Start main processing loop s.wg.Add(1) @@ -328,7 +327,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block select { case <-blockTicker: // Process headers - newHeaderHeight := s.headerBroadcaster.Store().Height() + newHeaderHeight := s.headerStore.Height() if newHeaderHeight > *lastHeaderHeight { events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) for _, event := range events { @@ -345,7 +344,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block } // Process data - newDataHeight := s.dataBroadcaster.Store().Height() + newDataHeight := s.dataStore.Height() if newDataHeight == newHeaderHeight { *lastDataHeight = newDataHeight } else if newDataHeight > *lastDataHeight { @@ -409,13 +408,15 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { return } - // broadcast header and data to P2P network - g, ctx := errgroup.WithContext(s.ctx) - g.Go(func() error { return s.headerBroadcaster.WriteToStoreAndBroadcast(ctx, event.Header) }) - g.Go(func() error { return s.dataBroadcaster.WriteToStoreAndBroadcast(ctx, event.Data) }) - if err := g.Wait(); err != nil { - s.logger.Error().Err(err).Msg("failed to broadcast header and/data") - // don't fail block production on broadcast error + // Append new event to p2p stores + if err := s.headerStore.Append(s.ctx, event.Header); err != nil { + s.logger.Error().Err(err).Msg("failed to append event header to p2p store") + return + } + + if err := s.dataStore.Append(s.ctx, event.Data); err != nil { + s.logger.Error().Err(err).Msg("failed to append event data to p2p store") + return } } diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index ecaad863b0..2dc2bd804f 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - goheader "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" @@ -25,19 +24,6 @@ import ( "github.com/evstack/ev-node/types" ) -// mockBroadcaster for testing -type mockBroadcaster[T goheader.Header[T]] struct { - store goheader.Store[T] -} - -func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error { - return nil -} - -func (m *mockBroadcaster[T]) Store() goheader.Store[T] { - return m.store -} - // TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff // behavior when encountering different types of DA layer errors. func TestSyncer_BackoffOnDAError(t *testing.T) { @@ -90,11 +76,11 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} + syncer.headerStore = headerStore dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} + syncer.dataStore = dataStore var callTimes []time.Time callCount := 0 @@ -181,11 +167,11 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} + syncer.headerStore = headerStore dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} + syncer.dataStore = dataStore var callTimes []time.Time @@ -267,11 +253,11 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} + syncer.headerStore = headerStore dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} + syncer.dataStore = dataStore var callTimes []time.Time @@ -349,8 +335,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer { common.NopMetrics(), cfg, gen, - &mockBroadcaster[*types.SignedHeader]{}, - &mockBroadcaster[*types.Data]{}, + nil, + nil, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 355187ceea..28cf2af22d 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -108,8 +108,8 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay common.NopMetrics(), cfg, gen, - nil, // we inject P2P directly to channel when needed - nil, // injected when needed + nil, // headerStore not used; we inject P2P directly to channel when needed + nil, // dataStore not used zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -152,9 +152,9 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay s.p2pHandler = newMockp2pHandler(b) // not used directly in this benchmark path headerP2PStore := mocks.NewMockStore[*types.SignedHeader](b) headerP2PStore.On("Height").Return(uint64(0)).Maybe() - s.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerP2PStore} + s.headerStore = headerP2PStore dataP2PStore := mocks.NewMockStore[*types.Data](b) dataP2PStore.On("Height").Return(uint64(0)).Maybe() - s.dataBroadcaster = &mockBroadcaster[*types.Data]{dataP2PStore} + s.dataStore = dataP2PStore return &benchFixture{s: s, st: st, cm: cm, cancel: cancel} } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 067ca04319..0fe7d5192e 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -107,8 +107,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.NopMetrics(), cfg, gen, - &mockBroadcaster[*types.SignedHeader]{}, - &mockBroadcaster[*types.Data]{}, + nil, + nil, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -155,8 +155,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NopMetrics(), cfg, gen, - &mockBroadcaster[*types.SignedHeader]{}, - &mockBroadcaster[*types.Data]{}, + nil, + nil, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -205,8 +205,8 @@ func TestSequentialBlockSync(t *testing.T) { common.NopMetrics(), cfg, gen, - &mockBroadcaster[*types.SignedHeader]{}, - &mockBroadcaster[*types.Data]{}, + nil, + nil, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -332,8 +332,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - &mockBroadcaster[*types.SignedHeader]{}, - &mockBroadcaster[*types.Data]{}, + nil, + nil, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -409,8 +409,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - &mockBroadcaster[*types.SignedHeader]{}, - &mockBroadcaster[*types.Data]{}, + nil, + nil, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), diff --git a/node/full.go b/node/full.go index 947c164785..c1704355dc 100644 --- a/node/full.go +++ b/node/full.go @@ -120,8 +120,8 @@ func newFullNode( rktStore, exec, da, - headerSyncService, - dataSyncService, + headerSyncService.Store(), + dataSyncService.Store(), logger, blockMetrics, nodeOpts.BlockOptions, From 2fade2726b32fc3a8663b53317bba948cc82927b Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 6 Oct 2025 17:35:30 +0200 Subject: [PATCH 05/16] tests --- block/internal/syncing/syncer_backoff_test.go | 4 +- block/internal/syncing/syncer_test.go | 41 ++++++++++++++----- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 2dc2bd804f..bd1eb6c663 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -335,8 +335,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer { common.NopMetrics(), cfg, gen, - nil, - nil, + &mocks.MockStore[*types.SignedHeader]{}, + &mocks.MockStore[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 0fe7d5192e..cf0c7d3c44 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -13,6 +13,7 @@ import ( signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" testmocks "github.com/evstack/ev-node/test/mocks" + mocks "github.com/evstack/ev-node/test/mocks/external" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" @@ -107,8 +108,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mocks.MockStore[*types.SignedHeader]{}, + &mocks.MockStore[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -147,6 +148,11 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) + mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} + mockP2PHeaderStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() + mockP2PDataStore := &mocks.MockStore[*types.Data]{} + mockP2PDataStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() + s := NewSyncer( st, mockExec, @@ -155,8 +161,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + mockP2PHeaderStore, + mockP2PDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -197,6 +203,11 @@ func TestSequentialBlockSync(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) + mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} + mockP2PHeaderStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() + mockP2PDataStore := &mocks.MockStore[*types.Data]{} + mockP2PDataStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() + s := NewSyncer( st, mockExec, @@ -205,8 +216,8 @@ func TestSequentialBlockSync(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + mockP2PHeaderStore, + mockP2PDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -324,6 +335,11 @@ func TestSyncLoopPersistState(t *testing.T) { dummyExec := execution.NewDummyExecutor() + mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} + mockP2PHeaderStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() + mockP2PDataStore := &mocks.MockStore[*types.Data]{} + mockP2PDataStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() + syncerInst1 := NewSyncer( st, dummyExec, @@ -332,8 +348,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + mockP2PHeaderStore, + mockP2PDataStore, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -401,6 +417,11 @@ func TestSyncLoopPersistState(t *testing.T) { require.NoError(t, err) require.NoError(t, cm.LoadFromDisk()) + mockP2PHeaderStore2 := &mocks.MockStore[*types.SignedHeader]{} + mockP2PHeaderStore2.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() + mockP2PDataStore2 := &mocks.MockStore[*types.Data]{} + mockP2PDataStore2.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() + syncerInst2 := NewSyncer( st, dummyExec, @@ -409,8 +430,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + mockP2PHeaderStore2, + mockP2PDataStore2, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), From 1eba9371b0940b9340f81269ae5a1288e73b399d Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 6 Oct 2025 18:03:32 +0200 Subject: [PATCH 06/16] trusting period + async save --- block/internal/syncing/syncer.go | 16 +++++++--------- pkg/sync/sync_service.go | 1 + 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index d39700fa15..ebff868235 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -11,6 +11,7 @@ import ( goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" @@ -408,15 +409,12 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { return } - // Append new event to p2p stores - if err := s.headerStore.Append(s.ctx, event.Header); err != nil { - s.logger.Error().Err(err).Msg("failed to append event header to p2p store") - return - } - - if err := s.dataStore.Append(s.ctx, event.Data); err != nil { - s.logger.Error().Err(err).Msg("failed to append event data to p2p store") - return + // save header and data to P2P stores + g, ctx := errgroup.WithContext(s.ctx) + g.Go(func() error { return s.headerStore.Append(ctx, event.Header) }) + g.Go(func() error { return s.dataStore.Append(ctx, event.Data) }) + if err := g.Wait(); err != nil { + s.logger.Error().Err(err).Msg("failed to append event header and/or data to p2p store") } } diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index f7df81db97..ccaa804535 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -420,6 +420,7 @@ func newSyncer[H header.Header[H]]( opts = append(opts, goheadersync.WithMetrics(), goheadersync.WithPruningWindow(ninetyNineYears), + goheadersync.WithTrustingPeriod(ninetyNineYears), ) return goheadersync.NewSyncer(ex, store, sub, opts...) } From 2991c643ce44ba7c8f22d629c617fc9736d56f66 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 6 Oct 2025 19:47:45 +0200 Subject: [PATCH 07/16] revert to writetostoreandbroadcast --- block/components.go | 11 +++++++---- block/internal/common/expected_interfaces.go | 13 +++++++++++++ block/internal/executing/executor.go | 13 ++++--------- block/internal/executing/executor_test.go | 11 ++++++++--- block/internal/syncing/syncer.go | 19 +++++++++---------- node/full.go | 4 ++-- 6 files changed, 43 insertions(+), 28 deletions(-) create mode 100644 block/internal/common/expected_interfaces.go diff --git a/block/components.go b/block/components.go index a157817703..1e5f59d872 100644 --- a/block/components.go +++ b/block/components.go @@ -9,6 +9,7 @@ import ( "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/executing" "github.com/evstack/ev-node/block/internal/reaping" "github.com/evstack/ev-node/block/internal/submitting" @@ -123,8 +124,10 @@ func (bc *Components) Stop() error { } // broadcaster interface for P2P broadcasting -type broadcaster[T any] interface { - WriteToStoreAndBroadcast(ctx context.Context, payload T) error +// broadcaster interface for P2P broadcasting +type broadcaster[H goheader.Header[H]] interface { + WriteToStoreAndBroadcast(ctx context.Context, payload H) error + Store() goheader.Store[H] } // NewSyncComponents creates components for a non-aggregator full node that can only sync blocks. @@ -136,8 +139,8 @@ func NewSyncComponents( store store.Store, exec coreexecutor.Executor, da coreda.DA, - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerStore common.Broadcaster[*types.SignedHeader], + dataStore common.Broadcaster[*types.Data], logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, diff --git a/block/internal/common/expected_interfaces.go b/block/internal/common/expected_interfaces.go new file mode 100644 index 0000000000..c167e5da43 --- /dev/null +++ b/block/internal/common/expected_interfaces.go @@ -0,0 +1,13 @@ +package common + +import ( + "context" + + goheader "github.com/celestiaorg/go-header" +) + +// Broadcaster interface for handling P2P stores and broadcasting +type Broadcaster[H goheader.Header[H]] interface { + WriteToStoreAndBroadcast(ctx context.Context, payload H) error + Store() goheader.Store[H] +} diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 2d93a7c464..a8be5a4f62 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -23,11 +23,6 @@ import ( "github.com/evstack/ev-node/types" ) -// broadcaster interface for P2P broadcasting -type broadcaster[T any] interface { - WriteToStoreAndBroadcast(ctx context.Context, payload T) error -} - // Executor handles block production, transaction processing, and state management type Executor struct { // Core components @@ -41,8 +36,8 @@ type Executor struct { metrics *common.Metrics // Broadcasting - headerBroadcaster broadcaster[*types.SignedHeader] - dataBroadcaster broadcaster[*types.Data] + headerBroadcaster common.Broadcaster[*types.SignedHeader] + dataBroadcaster common.Broadcaster[*types.Data] // Configuration config config.Config @@ -81,8 +76,8 @@ func NewExecutor( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerBroadcaster broadcaster[*types.SignedHeader], - dataBroadcaster broadcaster[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, diff --git a/block/internal/executing/executor_test.go b/block/internal/executing/executor_test.go index b5f9e2f47a..4058bf8043 100644 --- a/block/internal/executing/executor_test.go +++ b/block/internal/executing/executor_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" @@ -20,17 +21,21 @@ import ( ) // mockBroadcaster for testing -type mockBroadcaster[T any] struct { +type mockBroadcaster[H header.Header[H]] struct { called bool - payload T + payload H } -func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error { +func (m *mockBroadcaster[H]) WriteToStoreAndBroadcast(ctx context.Context, payload H) error { m.called = true m.payload = payload return nil } +func (m *mockBroadcaster[H]) Store() header.Store[H] { + return nil +} + func TestExecutor_BroadcasterIntegration(t *testing.T) { // Create in-memory store ds := sync.MutexWrap(datastore.NewMapDatastore()) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index ebff868235..f06ec7da90 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -9,7 +9,6 @@ import ( "sync/atomic" "time" - goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" @@ -55,8 +54,8 @@ type Syncer struct { daHeight uint64 // P2P stores - headerStore goheader.Store[*types.SignedHeader] - dataStore goheader.Store[*types.Data] + headerStore common.Broadcaster[*types.SignedHeader] + dataStore common.Broadcaster[*types.Data] // Channels for coordination heightInCh chan common.DAHeightEvent @@ -84,8 +83,8 @@ func NewSyncer( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerStore common.Broadcaster[*types.SignedHeader], + dataStore common.Broadcaster[*types.Data], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, @@ -119,7 +118,7 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger) - s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.genesis, s.options, s.logger) + s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.genesis, s.options, s.logger) // Start main processing loop s.wg.Add(1) @@ -328,7 +327,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block select { case <-blockTicker: // Process headers - newHeaderHeight := s.headerStore.Height() + newHeaderHeight := s.headerStore.Store().Height() if newHeaderHeight > *lastHeaderHeight { events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) for _, event := range events { @@ -345,7 +344,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block } // Process data - newDataHeight := s.dataStore.Height() + newDataHeight := s.dataStore.Store().Height() if newDataHeight == newHeaderHeight { *lastDataHeight = newDataHeight } else if newDataHeight > *lastDataHeight { @@ -411,8 +410,8 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { // save header and data to P2P stores g, ctx := errgroup.WithContext(s.ctx) - g.Go(func() error { return s.headerStore.Append(ctx, event.Header) }) - g.Go(func() error { return s.dataStore.Append(ctx, event.Data) }) + g.Go(func() error { return s.headerStore.WriteToStoreAndBroadcast(ctx, event.Header) }) + g.Go(func() error { return s.dataStore.WriteToStoreAndBroadcast(ctx, event.Data) }) if err := g.Wait(); err != nil { s.logger.Error().Err(err).Msg("failed to append event header and/or data to p2p store") } diff --git a/node/full.go b/node/full.go index c1704355dc..947c164785 100644 --- a/node/full.go +++ b/node/full.go @@ -120,8 +120,8 @@ func newFullNode( rktStore, exec, da, - headerSyncService.Store(), - dataSyncService.Store(), + headerSyncService, + dataSyncService, logger, blockMetrics, nodeOpts.BlockOptions, From 510de9eee2968826cb685b499cc53b96bd9c4f72 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 6 Oct 2025 20:36:01 +0200 Subject: [PATCH 08/16] update the syner to ignore empty datahashes --- block/internal/syncing/syncer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index f06ec7da90..645283293f 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -411,7 +411,10 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { // save header and data to P2P stores g, ctx := errgroup.WithContext(s.ctx) g.Go(func() error { return s.headerStore.WriteToStoreAndBroadcast(ctx, event.Header) }) - g.Go(func() error { return s.dataStore.WriteToStoreAndBroadcast(ctx, event.Data) }) + // we only need to save data if it's not empty + if !bytes.Equal(event.Header.Hash(), common.DataHashForEmptyTxs) { + g.Go(func() error { return s.dataStore.WriteToStoreAndBroadcast(ctx, event.Data) }) + } if err := g.Wait(); err != nil { s.logger.Error().Err(err).Msg("failed to append event header and/or data to p2p store") } From 26a7922f32cf7501d2fee283d81df37bc12b32d1 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 6 Oct 2025 20:46:30 +0200 Subject: [PATCH 09/16] timing hack --- block/internal/syncing/p2p_handler.go | 59 +++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 8 deletions(-) diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index cd369e44a0..7af659eb95 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -3,7 +3,9 @@ package syncing import ( "bytes" "context" + "errors" "fmt" + "time" goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" @@ -54,8 +56,18 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei default: } - header, err := h.headerStore.GetByHeight(ctx, height) + // Create a timeout context for each GetByHeight call to prevent blocking + timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + header, err := h.headerStore.GetByHeight(timeoutCtx, height) + cancel() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later") + // Don't continue processing further heights if we timeout on one + // This prevents blocking on sequential heights + return events + } h.logger.Debug().Uint64("height", height).Err(err).Msg("failed to get header from store") continue } @@ -72,9 +84,18 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei // Create empty data for headers with empty data hash data = h.createEmptyDataForHeader(ctx, header) } else { - // Try to get data from data store - retrievedData, err := h.dataStore.GetByHeight(ctx, height) + // Try to get data from data store with timeout + timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + retrievedData, err := h.dataStore.GetByHeight(timeoutCtx, height) + cancel() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later") + // Don't continue processing if data is not available + // Store event with header only for later processing + continue + } h.logger.Debug().Uint64("height", height).Err(err).Msg("could not retrieve data for header from data store") continue } @@ -114,15 +135,33 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh default: } - data, err := h.dataStore.GetByHeight(ctx, height) + // Create a timeout context for each GetByHeight call to prevent blocking + timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + data, err := h.dataStore.GetByHeight(timeoutCtx, height) + cancel() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later") + // Don't continue processing further heights if we timeout on one + // This prevents blocking on sequential heights + return events + } h.logger.Debug().Uint64("height", height).Err(err).Msg("failed to get data from store") continue } - // Get corresponding header - header, err := h.headerStore.GetByHeight(ctx, height) + // Get corresponding header with timeout + timeoutCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond) + header, err := h.headerStore.GetByHeight(timeoutCtx, height) + cancel() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later") + // Don't continue processing if header is not available + continue + } h.logger.Debug().Uint64("height", height).Err(err).Msg("could not retrieve header for data from header store") continue } @@ -166,8 +205,12 @@ func (h *P2PHandler) createEmptyDataForHeader(ctx context.Context, header *types var lastDataHash types.Hash if headerHeight > 1 { - // Try to get previous data hash, but don't fail if not available - if prevData, err := h.dataStore.GetByHeight(ctx, headerHeight-1); err == nil && prevData != nil { + // Try to get previous data hash with a short timeout, but don't fail if not available + timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + prevData, err := h.dataStore.GetByHeight(timeoutCtx, headerHeight-1) + cancel() + + if err == nil && prevData != nil { lastDataHash = prevData.Hash() } else { h.logger.Debug().Uint64("current_height", headerHeight).Uint64("previous_height", headerHeight-1). From 253841cdeff87ac9ee15d36655115b59c2c5f34d Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 6 Oct 2025 20:58:11 +0200 Subject: [PATCH 10/16] add source to event for processing --- block/internal/common/event.go | 12 +++++++++++ block/internal/syncing/da_retriever.go | 1 + block/internal/syncing/p2p_handler.go | 2 ++ block/internal/syncing/syncer.go | 28 +++++++++++++++++--------- 4 files changed, 34 insertions(+), 9 deletions(-) diff --git a/block/internal/common/event.go b/block/internal/common/event.go index 227b1ed391..69d0300f9f 100644 --- a/block/internal/common/event.go +++ b/block/internal/common/event.go @@ -2,10 +2,22 @@ package common import "github.com/evstack/ev-node/types" +// EventSource represents the origin of a block event +type EventSource string + +const ( + // SourceDA indicates the event came from the DA layer + SourceDA EventSource = "DA" + // SourceP2P indicates the event came from P2P network + SourceP2P EventSource = "P2P" +) + // DAHeightEvent represents a DA event for caching type DAHeightEvent struct { Header *types.SignedHeader Data *types.Data // DaHeight corresponds to the highest DA included height between the Header and Data. DaHeight uint64 + // Source indicates where this event originated from (DA or P2P) + Source EventSource } diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index b579101511..33ae79bca6 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -210,6 +210,7 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight Header: header, Data: data, DaHeight: daHeight, + Source: common.SourceDA, } events = append(events, event) diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index 7af659eb95..c3f3f0efab 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -110,6 +110,7 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei Header: header, Data: data, DaHeight: 0, // P2P events don't have DA height context + Source: common.SourceP2P, } events = append(events, event) @@ -180,6 +181,7 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh Header: header, Data: data, DaHeight: 0, // P2P events don't have DA height context + Source: common.SourceP2P, } events = append(events, event) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 645283293f..35b144088a 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -376,6 +376,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { Uint64("height", height). Uint64("da_height", event.DaHeight). Str("hash", headerHash). + Str("source", string(event.Source)). Msg("processing height event") currentHeight, err := s.store.Height(s.ctx) @@ -408,15 +409,23 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { return } - // save header and data to P2P stores - g, ctx := errgroup.WithContext(s.ctx) - g.Go(func() error { return s.headerStore.WriteToStoreAndBroadcast(ctx, event.Header) }) - // we only need to save data if it's not empty - if !bytes.Equal(event.Header.Hash(), common.DataHashForEmptyTxs) { - g.Go(func() error { return s.dataStore.WriteToStoreAndBroadcast(ctx, event.Data) }) - } - if err := g.Wait(); err != nil { - s.logger.Error().Err(err).Msg("failed to append event header and/or data to p2p store") + // Only save to P2P stores if the event came from DA + // P2P events are already in the P2P stores, so we don't need to write them back + if event.Source == common.SourceDA { + g, ctx := errgroup.WithContext(s.ctx) + g.Go(func() error { return s.headerStore.WriteToStoreAndBroadcast(ctx, event.Header) }) + // we only need to save data if it's not empty + if !bytes.Equal(event.Header.Hash(), common.DataHashForEmptyTxs) { + g.Go(func() error { return s.dataStore.WriteToStoreAndBroadcast(ctx, event.Data) }) + } + if err := g.Wait(); err != nil { + s.logger.Error().Err(err).Msg("failed to append event header and/or data to p2p store") + } + } else if event.Source == common.SourceP2P { + s.logger.Debug(). + Uint64("height", event.Header.Height()). + Str("source", string(event.Source)). + Msg("skipping P2P store write for P2P-sourced event") } } @@ -612,6 +621,7 @@ func (s *Syncer) processPendingEvents() { Header: event.Header, Data: event.Data, DaHeight: event.DaHeight, + Source: event.Source, } select { From 42d091093a441c493b9e83c6f6e5362df717c97f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 6 Oct 2025 21:57:03 +0200 Subject: [PATCH 11/16] cleanups + fix tests --- block/components.go | 20 ++---- block/internal/executing/executor.go | 2 +- block/internal/executing/executor_test.go | 12 ++-- block/internal/syncing/p2p_handler_test.go | 31 +++++----- block/internal/syncing/syncer.go | 61 ++++++++----------- block/internal/syncing/syncer_backoff_test.go | 30 ++++++--- .../internal/syncing/syncer_benchmark_test.go | 8 +-- block/internal/syncing/syncer_test.go | 41 +++---------- 8 files changed, 91 insertions(+), 114 deletions(-) diff --git a/block/components.go b/block/components.go index 1e5f59d872..115fb4aad7 100644 --- a/block/components.go +++ b/block/components.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/cache" @@ -123,13 +122,6 @@ func (bc *Components) Stop() error { return errs } -// broadcaster interface for P2P broadcasting -// broadcaster interface for P2P broadcasting -type broadcaster[H goheader.Header[H]] interface { - WriteToStoreAndBroadcast(ctx context.Context, payload H) error - Store() goheader.Store[H] -} - // NewSyncComponents creates components for a non-aggregator full node that can only sync blocks. // Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA. // They have more sync capabilities than light nodes but no block production. No signer required. @@ -139,8 +131,8 @@ func NewSyncComponents( store store.Store, exec coreexecutor.Executor, da coreda.DA, - headerStore common.Broadcaster[*types.SignedHeader], - dataStore common.Broadcaster[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, @@ -161,8 +153,8 @@ func NewSyncComponents( metrics, config, genesis, - headerStore, - dataStore, + headerBroadcaster, + dataBroadcaster, logger, blockOpts, errorCh, @@ -202,8 +194,8 @@ func NewAggregatorComponents( sequencer coresequencer.Sequencer, da coreda.DA, signer signer.Signer, - headerBroadcaster broadcaster[*types.SignedHeader], - dataBroadcaster broadcaster[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index a8be5a4f62..ec859a1ae4 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -35,7 +35,7 @@ type Executor struct { cache cache.Manager metrics *common.Metrics - // Broadcasting + // P2P handling headerBroadcaster common.Broadcaster[*types.SignedHeader] dataBroadcaster common.Broadcaster[*types.Data] diff --git a/block/internal/executing/executor_test.go b/block/internal/executing/executor_test.go index 4058bf8043..bf8f752a5d 100644 --- a/block/internal/executing/executor_test.go +++ b/block/internal/executing/executor_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - "github.com/celestiaorg/go-header" + goheader "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" @@ -21,19 +21,19 @@ import ( ) // mockBroadcaster for testing -type mockBroadcaster[H header.Header[H]] struct { +type mockBroadcaster[T goheader.Header[T]] struct { called bool - payload H + payload T } -func (m *mockBroadcaster[H]) WriteToStoreAndBroadcast(ctx context.Context, payload H) error { +func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error { m.called = true m.payload = payload return nil } -func (m *mockBroadcaster[H]) Store() header.Store[H] { - return nil +func (m *mockBroadcaster[T]) Store() goheader.Store[T] { + panic("should not be needed") } func TestExecutor_BroadcasterIntegration(t *testing.T) { diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index 11f24caca1..ccce5f1be1 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/evstack/ev-node/block/internal/common" @@ -104,8 +105,8 @@ func TestP2PHandler_ProcessHeaderRange_HeaderAndDataHappyPath(t *testing.T) { // Sanity: header should validate with data using default sync verifier require.NoError(t, signedHeader.ValidateBasicWithData(blockData), "header+data must validate before handler processes them") - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(signedHeader, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(blockData, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(signedHeader, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(blockData, nil).Once() events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 5) require.Len(t, events, 1, "expected one event for the provided header/data height") @@ -125,8 +126,8 @@ func TestP2PHandler_ProcessHeaderRange_MissingData_NonEmptyHash(t *testing.T) { blockData := makeData(p2pData.Genesis.ChainID, 7, 1) signedHeader.DataHash = blockData.DACommitment() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(7)).Return(signedHeader, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(7)).Return(nil, errors.New("not found")).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(signedHeader, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, errors.New("not found")).Once() events := p2pData.Handler.ProcessHeaderRange(ctx, 7, 7) require.Len(t, events, 0) @@ -137,8 +138,8 @@ func TestP2PHandler_ProcessDataRange_HeaderMissing(t *testing.T) { ctx := context.Background() blockData := makeData(p2pData.Genesis.ChainID, 9, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(blockData, nil).Once() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(nil, errors.New("no header")).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(blockData, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, errors.New("no header")).Once() events := p2pData.Handler.ProcessDataRange(ctx, 9, 9) require.Len(t, events, 0) @@ -154,7 +155,7 @@ func TestP2PHandler_ProposerMismatch_Rejected(t *testing.T) { signedHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 4, badAddr, pub, signer) signedHeader.DataHash = common.DataHashForEmptyTxs - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(4)).Return(signedHeader, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(4)).Return(signedHeader, nil).Once() events := p2pData.Handler.ProcessHeaderRange(ctx, 4, 4) require.Len(t, events, 0) @@ -170,7 +171,7 @@ func TestP2PHandler_CreateEmptyDataForHeader_UsesPreviousDataHash(t *testing.T) // Mock previous data at height 9 so handler can propagate its hash previousData := makeData(p2pData.Genesis.ChainID, 9, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(previousData, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(previousData, nil).Once() emptyData := p2pData.Handler.createEmptyDataForHeader(ctx, signedHeader) require.NotNil(t, emptyData, "handler should synthesize empty data when header declares empty data hash") @@ -189,7 +190,7 @@ func TestP2PHandler_CreateEmptyDataForHeader_NoPreviousData(t *testing.T) { signedHeader.DataHash = common.DataHashForEmptyTxs // Mock previous data fetch failure - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(1)).Return(nil, errors.New("not available")).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(1)).Return(nil, errors.New("not available")).Once() emptyData := p2pData.Handler.createEmptyDataForHeader(ctx, signedHeader) require.NotNil(t, emptyData, "handler should synthesize empty data even when previous data is unavailable") @@ -229,10 +230,10 @@ func TestP2PHandler_ProcessHeaderRange_MultipleHeightsHappyPath(t *testing.T) { require.NoError(t, header6.ValidateBasicWithData(data6), "header/data invalid for height 6") // Expectations for both heights - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(header5, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(data5, nil).Once() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(6)).Return(header6, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(6)).Return(data6, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(header5, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(data5, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header6, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data6, nil).Once() events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 6) require.Len(t, events, 2, "expected two events for heights 5 and 6") @@ -248,13 +249,13 @@ func TestP2PHandler_ProcessDataRange_HeaderValidateHeaderFails(t *testing.T) { // Data exists at height 3 blockData := makeData(p2pData.Genesis.ChainID, 3, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(3)).Return(blockData, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(3)).Return(blockData, nil).Once() // Header proposer does not match genesis -> validateHeader should fail badAddr, pub, signer := buildTestSigner(t) require.NotEqual(t, string(p2pData.Genesis.ProposerAddress), string(badAddr), "negative test requires mismatched proposer") badHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 3, badAddr, pub, signer) - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(3)).Return(badHeader, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(3)).Return(badHeader, nil).Once() events := p2pData.Handler.ProcessDataRange(ctx, 3, 3) require.Len(t, events, 0, "validateHeader failure should drop event") diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 35b144088a..4801716e16 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -25,6 +25,7 @@ import ( type daRetriever interface { RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) } + type p2pHandler interface { ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent @@ -53,9 +54,9 @@ type Syncer struct { // DA state daHeight uint64 - // P2P stores - headerStore common.Broadcaster[*types.SignedHeader] - dataStore common.Broadcaster[*types.Data] + // P2P handling + headerBroadcaster common.Broadcaster[*types.SignedHeader] + dataBroadcaster common.Broadcaster[*types.Data] // Channels for coordination heightInCh chan common.DAHeightEvent @@ -83,27 +84,27 @@ func NewSyncer( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerStore common.Broadcaster[*types.SignedHeader], - dataStore common.Broadcaster[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, ) *Syncer { return &Syncer{ - store: store, - exec: exec, - da: da, - cache: cache, - metrics: metrics, - config: config, - genesis: genesis, - options: options, - headerStore: headerStore, - dataStore: dataStore, - lastStateMtx: &sync.RWMutex{}, - heightInCh: make(chan common.DAHeightEvent, 10_000), - errorCh: errorCh, - logger: logger.With().Str("component", "syncer").Logger(), + store: store, + exec: exec, + da: da, + cache: cache, + metrics: metrics, + config: config, + genesis: genesis, + options: options, + headerBroadcaster: headerBroadcaster, + dataBroadcaster: dataBroadcaster, + lastStateMtx: &sync.RWMutex{}, + heightInCh: make(chan common.DAHeightEvent, 10_000), + errorCh: errorCh, + logger: logger.With().Str("component", "syncer").Logger(), } } @@ -118,7 +119,7 @@ func (s *Syncer) Start(ctx context.Context) error { // Initialize handlers s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger) - s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.genesis, s.options, s.logger) + s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.genesis, s.options, s.logger) // Start main processing loop s.wg.Add(1) @@ -327,7 +328,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block select { case <-blockTicker: // Process headers - newHeaderHeight := s.headerStore.Store().Height() + newHeaderHeight := s.headerBroadcaster.Store().Height() if newHeaderHeight > *lastHeaderHeight { events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) for _, event := range events { @@ -344,7 +345,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block } // Process data - newDataHeight := s.dataStore.Store().Height() + newDataHeight := s.dataBroadcaster.Store().Height() if newDataHeight == newHeaderHeight { *lastDataHeight = newDataHeight } else if newDataHeight > *lastDataHeight { @@ -376,7 +377,6 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { Uint64("height", height). Uint64("da_height", event.DaHeight). Str("hash", headerHash). - Str("source", string(event.Source)). Msg("processing height event") currentHeight, err := s.store.Height(s.ctx) @@ -409,23 +409,14 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { return } - // Only save to P2P stores if the event came from DA - // P2P events are already in the P2P stores, so we don't need to write them back + // only save to p2p stores if the event came from DA if event.Source == common.SourceDA { g, ctx := errgroup.WithContext(s.ctx) - g.Go(func() error { return s.headerStore.WriteToStoreAndBroadcast(ctx, event.Header) }) - // we only need to save data if it's not empty - if !bytes.Equal(event.Header.Hash(), common.DataHashForEmptyTxs) { - g.Go(func() error { return s.dataStore.WriteToStoreAndBroadcast(ctx, event.Data) }) - } + g.Go(func() error { return s.headerBroadcaster.WriteToStoreAndBroadcast(ctx, event.Header) }) + g.Go(func() error { return s.dataBroadcaster.WriteToStoreAndBroadcast(ctx, event.Data) }) if err := g.Wait(); err != nil { s.logger.Error().Err(err).Msg("failed to append event header and/or data to p2p store") } - } else if event.Source == common.SourceP2P { - s.logger.Debug(). - Uint64("height", event.Header.Height()). - Str("source", string(event.Source)). - Msg("skipping P2P store write for P2P-sourced event") } } diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index bd1eb6c663..ecaad863b0 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + goheader "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" @@ -24,6 +25,19 @@ import ( "github.com/evstack/ev-node/types" ) +// mockBroadcaster for testing +type mockBroadcaster[T goheader.Header[T]] struct { + store goheader.Store[T] +} + +func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error { + return nil +} + +func (m *mockBroadcaster[T]) Store() goheader.Store[T] { + return m.store +} + // TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff // behavior when encountering different types of DA layer errors. func TestSyncer_BackoffOnDAError(t *testing.T) { @@ -76,11 +90,11 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerStore = headerStore + syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataStore = dataStore + syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} var callTimes []time.Time callCount := 0 @@ -167,11 +181,11 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerStore = headerStore + syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataStore = dataStore + syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} var callTimes []time.Time @@ -253,11 +267,11 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerStore = headerStore + syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataStore = dataStore + syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} var callTimes []time.Time @@ -335,8 +349,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer { common.NopMetrics(), cfg, gen, - &mocks.MockStore[*types.SignedHeader]{}, - &mocks.MockStore[*types.Data]{}, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 28cf2af22d..355187ceea 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -108,8 +108,8 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay common.NopMetrics(), cfg, gen, - nil, // headerStore not used; we inject P2P directly to channel when needed - nil, // dataStore not used + nil, // we inject P2P directly to channel when needed + nil, // injected when needed zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -152,9 +152,9 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay s.p2pHandler = newMockp2pHandler(b) // not used directly in this benchmark path headerP2PStore := mocks.NewMockStore[*types.SignedHeader](b) headerP2PStore.On("Height").Return(uint64(0)).Maybe() - s.headerStore = headerP2PStore + s.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerP2PStore} dataP2PStore := mocks.NewMockStore[*types.Data](b) dataP2PStore.On("Height").Return(uint64(0)).Maybe() - s.dataStore = dataP2PStore + s.dataBroadcaster = &mockBroadcaster[*types.Data]{dataP2PStore} return &benchFixture{s: s, st: st, cm: cm, cancel: cancel} } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index cf0c7d3c44..067ca04319 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -13,7 +13,6 @@ import ( signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" testmocks "github.com/evstack/ev-node/test/mocks" - mocks "github.com/evstack/ev-node/test/mocks/external" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" @@ -108,8 +107,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.NopMetrics(), cfg, gen, - &mocks.MockStore[*types.SignedHeader]{}, - &mocks.MockStore[*types.Data]{}, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -148,11 +147,6 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) - mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} - mockP2PHeaderStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() - mockP2PDataStore := &mocks.MockStore[*types.Data]{} - mockP2PDataStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() - s := NewSyncer( st, mockExec, @@ -161,8 +155,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NopMetrics(), cfg, gen, - mockP2PHeaderStore, - mockP2PDataStore, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -203,11 +197,6 @@ func TestSequentialBlockSync(t *testing.T) { mockExec := testmocks.NewMockExecutor(t) - mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} - mockP2PHeaderStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() - mockP2PDataStore := &mocks.MockStore[*types.Data]{} - mockP2PDataStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() - s := NewSyncer( st, mockExec, @@ -216,8 +205,8 @@ func TestSequentialBlockSync(t *testing.T) { common.NopMetrics(), cfg, gen, - mockP2PHeaderStore, - mockP2PDataStore, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -335,11 +324,6 @@ func TestSyncLoopPersistState(t *testing.T) { dummyExec := execution.NewDummyExecutor() - mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} - mockP2PHeaderStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() - mockP2PDataStore := &mocks.MockStore[*types.Data]{} - mockP2PDataStore.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() - syncerInst1 := NewSyncer( st, dummyExec, @@ -348,8 +332,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - mockP2PHeaderStore, - mockP2PDataStore, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -417,11 +401,6 @@ func TestSyncLoopPersistState(t *testing.T) { require.NoError(t, err) require.NoError(t, cm.LoadFromDisk()) - mockP2PHeaderStore2 := &mocks.MockStore[*types.SignedHeader]{} - mockP2PHeaderStore2.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() - mockP2PDataStore2 := &mocks.MockStore[*types.Data]{} - mockP2PDataStore2.On("Append", mock.Anything, mock.Anything).Return(nil).Maybe() - syncerInst2 := NewSyncer( st, dummyExec, @@ -430,8 +409,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - mockP2PHeaderStore2, - mockP2PDataStore2, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), From 894f8777750696ee628ee4e80f734e630cb26e48 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 6 Oct 2025 22:25:43 +0200 Subject: [PATCH 12/16] speedup processing --- block/internal/syncing/syncer.go | 97 ++++++++++++++++++-------------- 1 file changed, 54 insertions(+), 43 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 4801716e16..d2ece8599b 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -244,20 +244,36 @@ func (s *Syncer) syncLoop() { // Backoff control when DA replies with errors nextDARequestAt := &time.Time{} - blockTicker := time.NewTicker(s.config.Node.BlockTime.Duration) - defer blockTicker.Stop() - for { + wg := sync.WaitGroup{} + select { case <-s.ctx.Done(): return default: } - // Process pending events from cache on every iteration - s.processPendingEvents() - - fetchedP2pEvent := s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker.C) - fetchedDaEvent := s.tryFetchFromDA(nextDARequestAt) + wg.Add(1) + go func() { + defer wg.Done() + s.processPendingEvents() + }() + + wg.Add(1) + fetchedP2pEvent := false + go func() { + defer wg.Done() + fetchedP2pEvent = s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight) + }() + + wg.Add(1) + fetchedDaEvent := false + go func() { + defer wg.Done() + fetchedDaEvent = s.tryFetchFromDA(nextDARequestAt) + }() + + // wait for pending events processing, p2p and da fetching + wg.Wait() // Prevent busy-waiting when no events are available if !fetchedDaEvent && !fetchedP2pEvent { @@ -322,48 +338,43 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { // tryFetchFromP2P attempts to fetch events from P2P stores. // It processes both header and data ranges when the block ticker fires. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, blockTicker <-chan time.Time) bool { +func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64) bool { eventsProcessed := false - select { - case <-blockTicker: - // Process headers - newHeaderHeight := s.headerBroadcaster.Store().Height() - if newHeaderHeight > *lastHeaderHeight { - events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - *lastHeaderHeight = newHeaderHeight - if len(events) > 0 { - eventsProcessed = true + // Process headers + newHeaderHeight := s.headerBroadcaster.Store().Height() + if newHeaderHeight > *lastHeaderHeight { + events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) + for _, event := range events { + select { + case s.heightInCh <- event: + default: + s.cache.SetPendingEvent(event.Header.Height(), &event) } } + *lastHeaderHeight = newHeaderHeight + if len(events) > 0 { + eventsProcessed = true + } + } - // Process data - newDataHeight := s.dataBroadcaster.Store().Height() - if newDataHeight == newHeaderHeight { - *lastDataHeight = newDataHeight - } else if newDataHeight > *lastDataHeight { - events := s.p2pHandler.ProcessDataRange(s.ctx, *lastDataHeight+1, newDataHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - *lastDataHeight = newDataHeight - if len(events) > 0 { - eventsProcessed = true + // Process data + newDataHeight := s.dataBroadcaster.Store().Height() + if newDataHeight == newHeaderHeight { + *lastDataHeight = newDataHeight + } else if newDataHeight > *lastDataHeight { + events := s.p2pHandler.ProcessDataRange(s.ctx, *lastDataHeight+1, newDataHeight) + for _, event := range events { + select { + case s.heightInCh <- event: + default: + s.cache.SetPendingEvent(event.Header.Height(), &event) } } - default: - // No P2P events available + *lastDataHeight = newDataHeight + if len(events) > 0 { + eventsProcessed = true + } } return eventsProcessed From bc2eb117cf7b406cfea0baea5d6f33674338493a Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 6 Oct 2025 22:42:09 +0200 Subject: [PATCH 13/16] prevent double fetching --- block/internal/syncing/syncer.go | 33 +++++++++++---------------- block/internal/syncing/syncer_test.go | 17 ++++++++++---- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index d2ece8599b..25bf75e7ef 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -232,15 +232,6 @@ func (s *Syncer) syncLoop() { } } - initialHeight, err := s.store.Height(s.ctx) - if err != nil { - s.logger.Error().Err(err).Msg("failed to get initial height") - return - } - - lastHeaderHeight := &initialHeight - lastDataHeight := &initialHeight - // Backoff control when DA replies with errors nextDARequestAt := &time.Time{} @@ -262,7 +253,7 @@ func (s *Syncer) syncLoop() { fetchedP2pEvent := false go func() { defer wg.Done() - fetchedP2pEvent = s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight) + fetchedP2pEvent = s.tryFetchFromP2P() }() wg.Add(1) @@ -338,13 +329,19 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { // tryFetchFromP2P attempts to fetch events from P2P stores. // It processes both header and data ranges when the block ticker fires. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64) bool { +func (s *Syncer) tryFetchFromP2P() bool { eventsProcessed := false + currentHeight, err := s.store.Height(s.ctx) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get current height") + return eventsProcessed + } + // Process headers newHeaderHeight := s.headerBroadcaster.Store().Height() - if newHeaderHeight > *lastHeaderHeight { - events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) + if newHeaderHeight > currentHeight { + events := s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight) for _, event := range events { select { case s.heightInCh <- event: @@ -352,18 +349,15 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64) bool s.cache.SetPendingEvent(event.Header.Height(), &event) } } - *lastHeaderHeight = newHeaderHeight if len(events) > 0 { eventsProcessed = true } } - // Process data + // Process data (if not already processed by headers) newDataHeight := s.dataBroadcaster.Store().Height() - if newDataHeight == newHeaderHeight { - *lastDataHeight = newDataHeight - } else if newDataHeight > *lastDataHeight { - events := s.p2pHandler.ProcessDataRange(s.ctx, *lastDataHeight+1, newDataHeight) + if newDataHeight != newHeaderHeight && newDataHeight > currentHeight { + events := s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight) for _, event := range events { select { case s.heightInCh <- event: @@ -371,7 +365,6 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64) bool s.cache.SetPendingEvent(event.Header.Height(), &event) } } - *lastDataHeight = newDataHeight if len(events) > 0 { eventsProcessed = true } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 067ca04319..0b000c00b5 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -13,6 +13,7 @@ import ( signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" testmocks "github.com/evstack/ev-node/test/mocks" + mocks "github.com/evstack/ev-node/test/mocks/external" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" @@ -323,6 +324,10 @@ func TestSyncLoopPersistState(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr, DAStartHeight: myDAHeightOffset} dummyExec := execution.NewDummyExecutor() + mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} + mockP2PDataStore := &mocks.MockStore[*types.Data]{} + mockP2PHeaderStore.On("Height", mock.Anything).Return(uint64(1), nil).Maybe() + mockP2PDataStore.On("Height", mock.Anything).Return(uint64(1), nil).Maybe() syncerInst1 := NewSyncer( st, @@ -332,8 +337,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - &mockBroadcaster[*types.SignedHeader]{}, - &mockBroadcaster[*types.Data]{}, + &mockBroadcaster[*types.SignedHeader]{mockP2PHeaderStore}, + &mockBroadcaster[*types.Data]{mockP2PDataStore}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -343,6 +348,8 @@ func TestSyncLoopPersistState(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) syncerInst1.ctx = ctx daRtrMock, p2pHndlMock := newMockdaRetriever(t), newMockp2pHandler(t) + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock // with n da blobs fetched @@ -409,8 +416,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - &mockBroadcaster[*types.SignedHeader]{}, - &mockBroadcaster[*types.Data]{}, + &mockBroadcaster[*types.SignedHeader]{mockP2PHeaderStore}, + &mockBroadcaster[*types.Data]{mockP2PDataStore}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -422,6 +429,8 @@ func TestSyncLoopPersistState(t *testing.T) { t.Cleanup(cancel) syncerInst2.ctx = ctx daRtrMock, p2pHndlMock = newMockdaRetriever(t), newMockp2pHandler(t) + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() syncerInst2.daRetriever, syncerInst2.p2pHandler = daRtrMock, p2pHndlMock daRtrMock.On("RetrieveFromDA", mock.Anything, mock.Anything). From 75bb6c0a56bd46ac5dec914f8d7bae8c15b15b8d Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 7 Oct 2025 09:52:26 +0200 Subject: [PATCH 14/16] submit p2p heights directly --- block/internal/syncing/da_retriever.go | 3 - block/internal/syncing/da_retriever_test.go | 24 +++---- block/internal/syncing/p2p_handler.go | 43 +++++++----- block/internal/syncing/p2p_handler_test.go | 2 +- block/internal/syncing/syncer.go | 62 ++++------------ block/internal/syncing/syncer_mock.go | 78 +++++++++------------ block/internal/syncing/syncer_test.go | 8 +-- 7 files changed, 88 insertions(+), 132 deletions(-) diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index 33ae79bca6..08bc5d917f 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -27,7 +27,6 @@ type DARetriever struct { da coreda.DA cache cache.Manager genesis genesis.Genesis - options common.BlockOptions logger zerolog.Logger // calculate namespaces bytes once and reuse them @@ -46,14 +45,12 @@ func NewDARetriever( cache cache.Manager, config config.Config, genesis genesis.Genesis, - options common.BlockOptions, logger zerolog.Logger, ) *DARetriever { return &DARetriever{ da: da, cache: cache, genesis: genesis, - options: options, logger: logger.With().Str("component", "da_retriever").Logger(), namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 22b27dcbc5..e1fade2937 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -59,7 +59,7 @@ func TestDARetriever_RetrieveFromDA_Invalid(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, errors.New("just invalid")).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) assert.Error(t, err) assert.Len(t, events, 0) @@ -77,7 +77,7 @@ func TestDARetriever_RetrieveFromDA_NotFound(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: whatever", coreda.ErrBlobNotFound.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) assert.True(t, errors.Is(err, coreda.ErrBlobNotFound)) assert.Len(t, events, 0) @@ -94,7 +94,7 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: later", coreda.ErrHeightFromFuture.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, derr := r.RetrieveFromDA(context.Background(), 1000) assert.Error(t, derr) assert.True(t, errors.Is(derr, coreda.ErrHeightFromFuture)) @@ -116,7 +116,7 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) { }). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) start := time.Now() events, err := r.RetrieveFromDA(context.Background(), 42) @@ -145,7 +145,7 @@ func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) @@ -165,7 +165,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2) hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data) @@ -193,7 +193,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Header with no data hash present should trigger empty data creation (per current logic) hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil) @@ -221,7 +221,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil) gotH := r.tryDecodeHeader(hb, 123) @@ -257,7 +257,7 @@ func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) { goodAddr, pub, signer := buildSyncTestSigner(t) badAddr := []byte("not-the-proposer") gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: badAddr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Signed data is made by goodAddr; retriever expects badAddr -> should be rejected db, _ := makeSignedDataBytes(t, gen.ChainID, 7, goodAddr, pub, signer, 1) @@ -310,7 +310,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { mockDA.EXPECT().Get(mock.Anything, mock.Anything, mock.MatchedBy(func(ns []byte) bool { return bytes.Equal(ns, namespaceDataBz) })). Return([][]byte{dataBin}, nil).Once() - r := NewDARetriever(mockDA, cm, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, cfg, gen, zerolog.Nop()) events, derr := r.RetrieveFromDA(context.Background(), 1234) require.NoError(t, derr) @@ -328,7 +328,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Create header and data for the same block height but from different DA heights dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2) @@ -364,7 +364,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Create multiple headers and data for different block heights data3Bin, data3 := makeSignedDataBytes(t, gen.ChainID, 3, addr, pub, signer, 1) diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index c3f3f0efab..764c5ad5a6 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -10,6 +10,7 @@ import ( goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" + "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/types" @@ -19,8 +20,8 @@ import ( type P2PHandler struct { headerStore goheader.Store[*types.SignedHeader] dataStore goheader.Store[*types.Data] + cache cache.Manager genesis genesis.Genesis - options common.BlockOptions logger zerolog.Logger } @@ -28,31 +29,29 @@ type P2PHandler struct { func NewP2PHandler( headerStore goheader.Store[*types.SignedHeader], dataStore goheader.Store[*types.Data], + cache cache.Manager, genesis genesis.Genesis, - options common.BlockOptions, logger zerolog.Logger, ) *P2PHandler { return &P2PHandler{ headerStore: headerStore, dataStore: dataStore, + cache: cache, genesis: genesis, - options: options, logger: logger.With().Str("component", "p2p_handler").Logger(), } } // ProcessHeaderRange processes headers from the header store within the given range -func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent { +func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { if startHeight > endHeight { - return nil + return } - var events []common.DAHeightEvent - for height := startHeight; height <= endHeight; height++ { select { case <-ctx.Done(): - return events + return default: } @@ -66,7 +65,7 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later") // Don't continue processing further heights if we timeout on one // This prevents blocking on sequential heights - return events + return } h.logger.Debug().Uint64("height", height).Err(err).Msg("failed to get header from store") continue @@ -113,26 +112,28 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei Source: common.SourceP2P, } - events = append(events, event) + select { + case heightInCh <- event: + default: + h.cache.SetPendingEvent(event.Header.Height(), &event) + } h.logger.Debug().Uint64("height", height).Str("source", "p2p_headers").Msg("processed header from P2P") } - return events + return } // ProcessDataRange processes data from the data store within the given range -func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent { +func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { if startHeight > endHeight { - return nil + return } - var events []common.DAHeightEvent - for height := startHeight; height <= endHeight; height++ { select { case <-ctx.Done(): - return events + return default: } @@ -146,7 +147,7 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later") // Don't continue processing further heights if we timeout on one // This prevents blocking on sequential heights - return events + return } h.logger.Debug().Uint64("height", height).Err(err).Msg("failed to get data from store") continue @@ -184,12 +185,16 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh Source: common.SourceP2P, } - events = append(events, event) + select { + case heightInCh <- event: + default: + h.cache.SetPendingEvent(event.Header.Height(), &event) + } h.logger.Debug().Uint64("height", height).Str("source", "p2p_data").Msg("processed data from P2P") } - return events + return } // assertExpectedProposer validates the proposer address diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index ccce5f1be1..73d1554846 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -73,7 +73,7 @@ func setupP2P(t *testing.T) *P2PTestData { headerStoreMock := extmocks.NewMockStore[*types.SignedHeader](t) dataStoreMock := extmocks.NewMockStore[*types.Data](t) - handler := NewP2PHandler(headerStoreMock, dataStoreMock, gen, common.DefaultBlockOptions(), zerolog.Nop()) + handler := NewP2PHandler(headerStoreMock, dataStoreMock, gen, zerolog.Nop()) return &P2PTestData{ Handler: handler, HeaderStore: headerStoreMock, diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 25bf75e7ef..21bda54c7f 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -27,8 +27,8 @@ type daRetriever interface { } type p2pHandler interface { - ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent - ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent + ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) + ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) } // Syncer handles block synchronization from DA and P2P sources. @@ -118,8 +118,8 @@ func (s *Syncer) Start(ctx context.Context) error { } // Initialize handlers - s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger) - s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.genesis, s.options, s.logger) + s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger) + s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.cache, s.genesis, s.logger) // Start main processing loop s.wg.Add(1) @@ -250,39 +250,32 @@ func (s *Syncer) syncLoop() { }() wg.Add(1) - fetchedP2pEvent := false go func() { defer wg.Done() - fetchedP2pEvent = s.tryFetchFromP2P() + s.tryFetchFromP2P() }() wg.Add(1) - fetchedDaEvent := false go func() { defer wg.Done() - fetchedDaEvent = s.tryFetchFromDA(nextDARequestAt) + s.tryFetchFromDA(nextDARequestAt) }() // wait for pending events processing, p2p and da fetching wg.Wait() - - // Prevent busy-waiting when no events are available - if !fetchedDaEvent && !fetchedP2pEvent { - time.Sleep(min(10*time.Millisecond, s.config.Node.BlockTime.Duration)) - } } } // tryFetchFromDA attempts to fetch events from the DA layer. // It handles backoff timing, DA height management, and error classification. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { +func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) { now := time.Now() daHeight := s.GetDAHeight() // Respect backoff window if set if !nextDARequestAt.IsZero() && now.Before(*nextDARequestAt) { - return false + return } // Retrieve from DA as fast as possible (unless throttled by HFF) @@ -294,7 +287,7 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { s.SetDAHeight(daHeight + 1) // Reset backoff on success *nextDARequestAt = time.Time{} - return false + return } // Back off exactly by DA block time to avoid overloading @@ -305,8 +298,7 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { *nextDARequestAt = now.Add(backoffDelay) s.logger.Error().Err(err).Dur("delay", backoffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests") - - return false + return } // Reset backoff on success @@ -323,54 +315,30 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { // increment DA height on successful retrieval s.SetDAHeight(daHeight + 1) - return len(events) > 0 + return } // tryFetchFromP2P attempts to fetch events from P2P stores. // It processes both header and data ranges when the block ticker fires. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromP2P() bool { - eventsProcessed := false - +func (s *Syncer) tryFetchFromP2P() { currentHeight, err := s.store.Height(s.ctx) if err != nil { s.logger.Error().Err(err).Msg("failed to get current height") - return eventsProcessed + return } // Process headers newHeaderHeight := s.headerBroadcaster.Store().Height() if newHeaderHeight > currentHeight { - events := s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - if len(events) > 0 { - eventsProcessed = true - } + s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight, s.heightInCh) } // Process data (if not already processed by headers) newDataHeight := s.dataBroadcaster.Store().Height() if newDataHeight != newHeaderHeight && newDataHeight > currentHeight { - events := s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - if len(events) > 0 { - eventsProcessed = true - } + s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight, s.heightInCh) } - - return eventsProcessed } func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { diff --git a/block/internal/syncing/syncer_mock.go b/block/internal/syncing/syncer_mock.go index 2413db59cd..85cad46960 100644 --- a/block/internal/syncing/syncer_mock.go +++ b/block/internal/syncing/syncer_mock.go @@ -134,22 +134,9 @@ func (_m *mockp2pHandler) EXPECT() *mockp2pHandler_Expecter { } // ProcessDataRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessDataRange(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent { - ret := _mock.Called(ctx, fromHeight, toHeight) - - if len(ret) == 0 { - panic("no return value specified for ProcessDataRange") - } - - var r0 []common.DAHeightEvent - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) []common.DAHeightEvent); ok { - r0 = returnFunc(ctx, fromHeight, toHeight) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.DAHeightEvent) - } - } - return r0 +func (_mock *mockp2pHandler) ProcessDataRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { + _mock.Called(ctx, fromHeight, toHeight, heightInCh) + return } // mockp2pHandler_ProcessDataRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessDataRange' @@ -161,11 +148,12 @@ type mockp2pHandler_ProcessDataRange_Call struct { // - ctx context.Context // - fromHeight uint64 // - toHeight uint64 -func (_e *mockp2pHandler_Expecter) ProcessDataRange(ctx interface{}, fromHeight interface{}, toHeight interface{}) *mockp2pHandler_ProcessDataRange_Call { - return &mockp2pHandler_ProcessDataRange_Call{Call: _e.mock.On("ProcessDataRange", ctx, fromHeight, toHeight)} +// - heightInCh chan<- common.DAHeightEvent +func (_e *mockp2pHandler_Expecter) ProcessDataRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessDataRange_Call { + return &mockp2pHandler_ProcessDataRange_Call{Call: _e.mock.On("ProcessDataRange", ctx, fromHeight, toHeight, heightInCh)} } -func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64)) *mockp2pHandler_ProcessDataRange_Call { +func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -179,42 +167,34 @@ func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 chan<- common.DAHeightEvent + if args[3] != nil { + arg3 = args[3].(chan<- common.DAHeightEvent) + } run( arg0, arg1, arg2, + arg3, ) }) return _c } -func (_c *mockp2pHandler_ProcessDataRange_Call) Return(dAHeightEvents []common.DAHeightEvent) *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Return(dAHeightEvents) +func (_c *mockp2pHandler_ProcessDataRange_Call) Return() *mockp2pHandler_ProcessDataRange_Call { + _c.Call.Return() return _c } -func (_c *mockp2pHandler_ProcessDataRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent) *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Return(run) +func (_c *mockp2pHandler_ProcessDataRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { + _c.Run(run) return _c } // ProcessHeaderRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessHeaderRange(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent { - ret := _mock.Called(ctx, fromHeight, toHeight) - - if len(ret) == 0 { - panic("no return value specified for ProcessHeaderRange") - } - - var r0 []common.DAHeightEvent - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) []common.DAHeightEvent); ok { - r0 = returnFunc(ctx, fromHeight, toHeight) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.DAHeightEvent) - } - } - return r0 +func (_mock *mockp2pHandler) ProcessHeaderRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { + _mock.Called(ctx, fromHeight, toHeight, heightInCh) + return } // mockp2pHandler_ProcessHeaderRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessHeaderRange' @@ -226,11 +206,12 @@ type mockp2pHandler_ProcessHeaderRange_Call struct { // - ctx context.Context // - fromHeight uint64 // - toHeight uint64 -func (_e *mockp2pHandler_Expecter) ProcessHeaderRange(ctx interface{}, fromHeight interface{}, toHeight interface{}) *mockp2pHandler_ProcessHeaderRange_Call { - return &mockp2pHandler_ProcessHeaderRange_Call{Call: _e.mock.On("ProcessHeaderRange", ctx, fromHeight, toHeight)} +// - heightInCh chan<- common.DAHeightEvent +func (_e *mockp2pHandler_Expecter) ProcessHeaderRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessHeaderRange_Call { + return &mockp2pHandler_ProcessHeaderRange_Call{Call: _e.mock.On("ProcessHeaderRange", ctx, fromHeight, toHeight, heightInCh)} } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64)) *mockp2pHandler_ProcessHeaderRange_Call { +func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -244,21 +225,26 @@ func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Conte if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 chan<- common.DAHeightEvent + if args[3] != nil { + arg3 = args[3].(chan<- common.DAHeightEvent) + } run( arg0, arg1, arg2, + arg3, ) }) return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Return(dAHeightEvents []common.DAHeightEvent) *mockp2pHandler_ProcessHeaderRange_Call { - _c.Call.Return(dAHeightEvents) +func (_c *mockp2pHandler_ProcessHeaderRange_Call) Return() *mockp2pHandler_ProcessHeaderRange_Call { + _c.Call.Return() return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent) *mockp2pHandler_ProcessHeaderRange_Call { - _c.Call.Return(run) +func (_c *mockp2pHandler_ProcessHeaderRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { + _c.Run(run) return _c } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 0b000c00b5..5b2295a3b4 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -348,8 +348,8 @@ func TestSyncLoopPersistState(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) syncerInst1.ctx = ctx daRtrMock, p2pHndlMock := newMockdaRetriever(t), newMockp2pHandler(t) - p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock // with n da blobs fetched @@ -429,8 +429,8 @@ func TestSyncLoopPersistState(t *testing.T) { t.Cleanup(cancel) syncerInst2.ctx = ctx daRtrMock, p2pHndlMock = newMockdaRetriever(t), newMockp2pHandler(t) - p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst2.daRetriever, syncerInst2.p2pHandler = daRtrMock, p2pHndlMock daRtrMock.On("RetrieveFromDA", mock.Anything, mock.Anything). From f9a99280e94afacb835fa41e2969c390c248188b Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 7 Oct 2025 11:33:10 +0200 Subject: [PATCH 15/16] fix tests --- block/internal/syncing/p2p_handler_test.go | 85 ++++++++++++++++++++-- block/internal/syncing/syncer_test.go | 8 +- 2 files changed, 81 insertions(+), 12 deletions(-) diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index 73d1554846..2265b53975 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -7,15 +7,19 @@ import ( "testing" "time" + ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" + storemocks "github.com/evstack/ev-node/test/mocks" extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) @@ -57,13 +61,14 @@ type P2PTestData struct { Handler *P2PHandler HeaderStore *extmocks.MockStore[*types.SignedHeader] DataStore *extmocks.MockStore[*types.Data] + Cache cache.Manager Genesis genesis.Genesis ProposerAddr []byte ProposerPub crypto.PubKey Signer signerpkg.Signer } -// setupP2P constructs a P2PHandler with mocked go-header stores +// setupP2P constructs a P2PHandler with mocked go-header stores and real cache func setupP2P(t *testing.T) *P2PTestData { t.Helper() proposerAddr, proposerPub, signer := buildTestSigner(t) @@ -73,11 +78,28 @@ func setupP2P(t *testing.T) *P2PTestData { headerStoreMock := extmocks.NewMockStore[*types.SignedHeader](t) dataStoreMock := extmocks.NewMockStore[*types.Data](t) - handler := NewP2PHandler(headerStoreMock, dataStoreMock, gen, zerolog.Nop()) + // Create a real cache manager for tests + storeMock := storemocks.NewMockStore(t) + // Mock the methods that cache manager initialization will call + // Return ErrNotFound for non-existent metadata keys + storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-header-height").Return(nil, ds.ErrNotFound).Maybe() + storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-data-height").Return(nil, ds.ErrNotFound).Maybe() + storeMock.EXPECT().Height(mock.Anything).Return(uint64(0), nil).Maybe() + storeMock.EXPECT().SetMetadata(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + + cfg := config.Config{ + RootDir: t.TempDir(), + ClearCache: true, + } + cacheManager, err := cache.NewManager(cfg, storeMock, zerolog.Nop()) + require.NoError(t, err, "failed to create cache manager") + + handler := NewP2PHandler(headerStoreMock, dataStoreMock, cacheManager, gen, zerolog.Nop()) return &P2PTestData{ Handler: handler, HeaderStore: headerStoreMock, DataStore: dataStoreMock, + Cache: cacheManager, Genesis: gen, ProposerAddr: proposerAddr, ProposerPub: proposerPub, @@ -85,6 +107,29 @@ func setupP2P(t *testing.T) *P2PTestData { } } +// collectEvents reads events from a channel with a timeout +func collectEvents(t *testing.T, ch <-chan common.DAHeightEvent, timeout time.Duration) []common.DAHeightEvent { + t.Helper() + var events []common.DAHeightEvent + deadline := time.After(timeout) + for { + select { + case event := <-ch: + events = append(events, event) + case <-deadline: + return events + case <-time.After(10 * time.Millisecond): + // Give it a moment to check if more events are coming + select { + case event := <-ch: + events = append(events, event) + default: + return events + } + } + } +} + func TestP2PHandler_ProcessHeaderRange_HeaderAndDataHappyPath(t *testing.T) { p2pData := setupP2P(t) ctx := context.Background() @@ -108,7 +153,11 @@ func TestP2PHandler_ProcessHeaderRange_HeaderAndDataHappyPath(t *testing.T) { p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(signedHeader, nil).Once() p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(blockData, nil).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 5) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 5, 5, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 1, "expected one event for the provided header/data height") require.Equal(t, uint64(5), events[0].Header.Height()) require.NotNil(t, events[0].Data) @@ -129,7 +178,11 @@ func TestP2PHandler_ProcessHeaderRange_MissingData_NonEmptyHash(t *testing.T) { p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(signedHeader, nil).Once() p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, errors.New("not found")).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 7, 7) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 7, 7, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -141,7 +194,11 @@ func TestP2PHandler_ProcessDataRange_HeaderMissing(t *testing.T) { p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(blockData, nil).Once() p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, errors.New("no header")).Once() - events := p2pData.Handler.ProcessDataRange(ctx, 9, 9) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessDataRange(ctx, 9, 9, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -157,7 +214,11 @@ func TestP2PHandler_ProposerMismatch_Rejected(t *testing.T) { p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(4)).Return(signedHeader, nil).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 4, 4) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 4, 4, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -235,7 +296,11 @@ func TestP2PHandler_ProcessHeaderRange_MultipleHeightsHappyPath(t *testing.T) { p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header6, nil).Once() p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data6, nil).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 6) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 5, 6, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 2, "expected two events for heights 5 and 6") require.Equal(t, uint64(5), events[0].Header.Height(), "first event should be height 5") require.Equal(t, uint64(6), events[1].Header.Height(), "second event should be height 6") @@ -257,6 +322,10 @@ func TestP2PHandler_ProcessDataRange_HeaderValidateHeaderFails(t *testing.T) { badHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 3, badAddr, pub, signer) p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(3)).Return(badHeader, nil).Once() - events := p2pData.Handler.ProcessDataRange(ctx, 3, 3) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessDataRange(ctx, 3, 3, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0, "validateHeader failure should drop event") } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 5b2295a3b4..a326950bd1 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -348,8 +348,8 @@ func TestSyncLoopPersistState(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) syncerInst1.ctx = ctx daRtrMock, p2pHndlMock := newMockdaRetriever(t), newMockp2pHandler(t) - p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() - p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock // with n da blobs fetched @@ -429,8 +429,8 @@ func TestSyncLoopPersistState(t *testing.T) { t.Cleanup(cancel) syncerInst2.ctx = ctx daRtrMock, p2pHndlMock = newMockdaRetriever(t), newMockp2pHandler(t) - p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() - p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst2.daRetriever, syncerInst2.p2pHandler = daRtrMock, p2pHndlMock daRtrMock.On("RetrieveFromDA", mock.Anything, mock.Anything). From 1a800bf6535d2a9401c9eb9df609c0e1a4cb6f79 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 7 Oct 2025 12:36:47 +0200 Subject: [PATCH 16/16] updates --- block/internal/syncing/p2p_handler.go | 4 ---- block/internal/syncing/syncer.go | 1 - 2 files changed, 5 deletions(-) diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index 764c5ad5a6..ed14ebd82e 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -120,8 +120,6 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei h.logger.Debug().Uint64("height", height).Str("source", "p2p_headers").Msg("processed header from P2P") } - - return } // ProcessDataRange processes data from the data store within the given range @@ -193,8 +191,6 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh h.logger.Debug().Uint64("height", height).Str("source", "p2p_data").Msg("processed data from P2P") } - - return } // assertExpectedProposer validates the proposer address diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 3a55764fc8..ae64827d9f 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -320,7 +320,6 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) { // increment DA height on successful retrieval s.SetDAHeight(daHeight + 1) - return } // tryFetchFromP2P attempts to fetch events from P2P stores.