Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"errors"
"fmt"

goheader "github.com/celestiaorg/go-header"
"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/executing"
"github.com/evstack/ev-node/block/internal/reaping"
"github.com/evstack/ev-node/block/internal/submitting"
Expand Down Expand Up @@ -122,11 +122,6 @@ func (bc *Components) Stop() error {
return errs
}

// broadcaster interface for P2P broadcasting
type broadcaster[T any] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
}

// NewSyncComponents creates components for a non-aggregator full node that can only sync blocks.
// Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA.
// They have more sync capabilities than light nodes but no block production. No signer required.
Expand All @@ -136,8 +131,8 @@ func NewSyncComponents(
store store.Store,
exec coreexecutor.Executor,
da coreda.DA,
headerStore goheader.Store[*types.SignedHeader],
dataStore goheader.Store[*types.Data],
headerBroadcaster common.Broadcaster[*types.SignedHeader],
dataBroadcaster common.Broadcaster[*types.Data],
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
Expand All @@ -158,8 +153,8 @@ func NewSyncComponents(
metrics,
config,
genesis,
headerStore,
dataStore,
headerBroadcaster,
dataBroadcaster,
logger,
blockOpts,
errorCh,
Expand Down Expand Up @@ -199,8 +194,8 @@ func NewAggregatorComponents(
sequencer coresequencer.Sequencer,
da coreda.DA,
signer signer.Signer,
headerBroadcaster broadcaster[*types.SignedHeader],
dataBroadcaster broadcaster[*types.Data],
headerBroadcaster common.Broadcaster[*types.SignedHeader],
dataBroadcaster common.Broadcaster[*types.Data],
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
Expand Down
13 changes: 13 additions & 0 deletions block/internal/common/expected_interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common

import (
"context"

goheader "github.com/celestiaorg/go-header"
)

// Broadcaster interface for handling P2P stores and broadcasting
type Broadcaster[H goheader.Header[H]] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload H) error
Store() goheader.Store[H]
}
13 changes: 4 additions & 9 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ import (
"github.com/evstack/ev-node/types"
)

// broadcaster interface for P2P broadcasting
type broadcaster[T any] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
}

// Executor handles block production, transaction processing, and state management
type Executor struct {
// Core components
Expand All @@ -41,8 +36,8 @@ type Executor struct {
metrics *common.Metrics

// Broadcasting
headerBroadcaster broadcaster[*types.SignedHeader]
dataBroadcaster broadcaster[*types.Data]
headerBroadcaster common.Broadcaster[*types.SignedHeader]
dataBroadcaster common.Broadcaster[*types.Data]

// Configuration
config config.Config
Expand Down Expand Up @@ -81,8 +76,8 @@ func NewExecutor(
metrics *common.Metrics,
config config.Config,
genesis genesis.Genesis,
headerBroadcaster broadcaster[*types.SignedHeader],
dataBroadcaster broadcaster[*types.Data],
headerBroadcaster common.Broadcaster[*types.SignedHeader],
dataBroadcaster common.Broadcaster[*types.Data],
logger zerolog.Logger,
options common.BlockOptions,
errorCh chan<- error,
Expand Down
7 changes: 6 additions & 1 deletion block/internal/executing/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

goheader "github.com/celestiaorg/go-header"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/rs/zerolog"
Expand All @@ -20,7 +21,7 @@ import (
)

// mockBroadcaster for testing
type mockBroadcaster[T any] struct {
type mockBroadcaster[T goheader.Header[T]] struct {
called bool
payload T
}
Expand All @@ -31,6 +32,10 @@ func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, paylo
return nil
}

func (m *mockBroadcaster[T]) Store() goheader.Store[T] {
panic("should not need to be needed")
}

func TestExecutor_BroadcasterIntegration(t *testing.T) {
// Create in-memory store
ds := sync.MutexWrap(datastore.NewMapDatastore())
Expand Down
56 changes: 33 additions & 23 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"sync/atomic"
"time"

goheader "github.com/celestiaorg/go-header"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
Expand All @@ -25,6 +25,7 @@ import (
type daRetriever interface {
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
}

type p2pHandler interface {
ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
Expand Down Expand Up @@ -53,9 +54,9 @@ type Syncer struct {
// DA state
daHeight uint64

// P2P stores
headerStore goheader.Store[*types.SignedHeader]
dataStore goheader.Store[*types.Data]
// P2P handling
headerBroadcaster common.Broadcaster[*types.SignedHeader]
dataBroadcaster common.Broadcaster[*types.Data]

// Channels for coordination
heightInCh chan common.DAHeightEvent
Expand Down Expand Up @@ -83,27 +84,27 @@ func NewSyncer(
metrics *common.Metrics,
config config.Config,
genesis genesis.Genesis,
headerStore goheader.Store[*types.SignedHeader],
dataStore goheader.Store[*types.Data],
headerBroadcaster common.Broadcaster[*types.SignedHeader],
dataBroadcaster common.Broadcaster[*types.Data],
logger zerolog.Logger,
options common.BlockOptions,
errorCh chan<- error,
) *Syncer {
return &Syncer{
store: store,
exec: exec,
da: da,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
options: options,
headerStore: headerStore,
dataStore: dataStore,
lastStateMtx: &sync.RWMutex{},
heightInCh: make(chan common.DAHeightEvent, 10_000),
errorCh: errorCh,
logger: logger.With().Str("component", "syncer").Logger(),
store: store,
exec: exec,
da: da,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
options: options,
headerBroadcaster: headerBroadcaster,
dataBroadcaster: dataBroadcaster,
lastStateMtx: &sync.RWMutex{},
heightInCh: make(chan common.DAHeightEvent, 10_000),
errorCh: errorCh,
logger: logger.With().Str("component", "syncer").Logger(),
}
}

Expand All @@ -118,7 +119,7 @@ func (s *Syncer) Start(ctx context.Context) error {

// Initialize handlers
s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger)
s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.genesis, s.options, s.logger)
s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.genesis, s.options, s.logger)

// Start main processing loop
s.wg.Add(1)
Expand Down Expand Up @@ -327,7 +328,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block
select {
case <-blockTicker:
// Process headers
newHeaderHeight := s.headerStore.Height()
newHeaderHeight := s.headerBroadcaster.Store().Height()
if newHeaderHeight > *lastHeaderHeight {
events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight)
for _, event := range events {
Expand All @@ -344,7 +345,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block
}

// Process data
newDataHeight := s.dataStore.Height()
newDataHeight := s.headerBroadcaster.Store().Height()
if newDataHeight == newHeaderHeight {
*lastDataHeight = newDataHeight
} else if newDataHeight > *lastDataHeight {
Expand Down Expand Up @@ -407,6 +408,15 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
}
return
}

// broadcast header and data to P2P network
g, ctx := errgroup.WithContext(s.ctx)
g.Go(func() error { return s.headerBroadcaster.WriteToStoreAndBroadcast(ctx, event.Header) })
g.Go(func() error { return s.dataBroadcaster.WriteToStoreAndBroadcast(ctx, event.Data) })
if err := g.Wait(); err != nil {
s.logger.Error().Err(err).Msg("failed to broadcast header and/data")
// don't fail block production on broadcast error
}
}

// errInvalidBlock is returned when a block is failing validation
Expand Down
30 changes: 22 additions & 8 deletions block/internal/syncing/syncer_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

goheader "github.com/celestiaorg/go-header"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/rs/zerolog"
Expand All @@ -24,6 +25,19 @@ import (
"github.com/evstack/ev-node/types"
)

// mockBroadcaster for testing
type mockBroadcaster[T goheader.Header[T]] struct {
store goheader.Store[T]
}

func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error {
return nil
}

func (m *mockBroadcaster[T]) Store() goheader.Store[T] {
return m.store
}

// TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff
// behavior when encountering different types of DA layer errors.
func TestSyncer_BackoffOnDAError(t *testing.T) {
Expand Down Expand Up @@ -76,11 +90,11 @@ func TestSyncer_BackoffOnDAError(t *testing.T) {

headerStore := mocks.NewMockStore[*types.SignedHeader](t)
headerStore.On("Height").Return(uint64(0)).Maybe()
syncer.headerStore = headerStore
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}

dataStore := mocks.NewMockStore[*types.Data](t)
dataStore.On("Height").Return(uint64(0)).Maybe()
syncer.dataStore = dataStore
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}

var callTimes []time.Time
callCount := 0
Expand Down Expand Up @@ -167,11 +181,11 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) {

headerStore := mocks.NewMockStore[*types.SignedHeader](t)
headerStore.On("Height").Return(uint64(0)).Maybe()
syncer.headerStore = headerStore
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}

dataStore := mocks.NewMockStore[*types.Data](t)
dataStore.On("Height").Return(uint64(0)).Maybe()
syncer.dataStore = dataStore
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}

var callTimes []time.Time

Expand Down Expand Up @@ -253,11 +267,11 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) {

headerStore := mocks.NewMockStore[*types.SignedHeader](t)
headerStore.On("Height").Return(uint64(0)).Maybe()
syncer.headerStore = headerStore
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}

dataStore := mocks.NewMockStore[*types.Data](t)
dataStore.On("Height").Return(uint64(0)).Maybe()
syncer.dataStore = dataStore
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}

var callTimes []time.Time

Expand Down Expand Up @@ -335,8 +349,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer {
common.NopMetrics(),
cfg,
gen,
nil,
nil,
&mockBroadcaster[*types.SignedHeader]{},
&mockBroadcaster[*types.Data]{},
zerolog.Nop(),
common.DefaultBlockOptions(),
make(chan error, 1),
Expand Down
8 changes: 4 additions & 4 deletions block/internal/syncing/syncer_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
common.NopMetrics(),
cfg,
gen,
nil, // headerStore not used; we inject P2P directly to channel when needed
nil, // dataStore not used
nil, // we inject P2P directly to channel when needed
nil, // injected when needed
zerolog.Nop(),
common.DefaultBlockOptions(),
make(chan error, 1),
Expand Down Expand Up @@ -152,9 +152,9 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
s.p2pHandler = newMockp2pHandler(b) // not used directly in this benchmark path
headerP2PStore := mocks.NewMockStore[*types.SignedHeader](b)
headerP2PStore.On("Height").Return(uint64(0)).Maybe()
s.headerStore = headerP2PStore
s.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerP2PStore}
dataP2PStore := mocks.NewMockStore[*types.Data](b)
dataP2PStore.On("Height").Return(uint64(0)).Maybe()
s.dataStore = dataP2PStore
s.dataBroadcaster = &mockBroadcaster[*types.Data]{dataP2PStore}
return &benchFixture{s: s, st: st, cm: cm, cancel: cancel}
}
Loading
Loading