diff --git a/block/components.go b/block/components.go index a15781770..115fb4aad 100644 --- a/block/components.go +++ b/block/components.go @@ -5,10 +5,10 @@ import ( "errors" "fmt" - goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" "github.com/evstack/ev-node/block/internal/cache" + "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/executing" "github.com/evstack/ev-node/block/internal/reaping" "github.com/evstack/ev-node/block/internal/submitting" @@ -122,11 +122,6 @@ func (bc *Components) Stop() error { return errs } -// broadcaster interface for P2P broadcasting -type broadcaster[T any] interface { - WriteToStoreAndBroadcast(ctx context.Context, payload T) error -} - // NewSyncComponents creates components for a non-aggregator full node that can only sync blocks. // Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA. // They have more sync capabilities than light nodes but no block production. No signer required. @@ -136,8 +131,8 @@ func NewSyncComponents( store store.Store, exec coreexecutor.Executor, da coreda.DA, - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, @@ -158,8 +153,8 @@ func NewSyncComponents( metrics, config, genesis, - headerStore, - dataStore, + headerBroadcaster, + dataBroadcaster, logger, blockOpts, errorCh, @@ -199,8 +194,8 @@ func NewAggregatorComponents( sequencer coresequencer.Sequencer, da coreda.DA, signer signer.Signer, - headerBroadcaster broadcaster[*types.SignedHeader], - dataBroadcaster broadcaster[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, metrics *Metrics, blockOpts BlockOptions, diff --git a/block/internal/common/event.go b/block/internal/common/event.go index 227b1ed39..69d0300f9 100644 --- a/block/internal/common/event.go +++ b/block/internal/common/event.go @@ -2,10 +2,22 @@ package common import "github.com/evstack/ev-node/types" +// EventSource represents the origin of a block event +type EventSource string + +const ( + // SourceDA indicates the event came from the DA layer + SourceDA EventSource = "DA" + // SourceP2P indicates the event came from P2P network + SourceP2P EventSource = "P2P" +) + // DAHeightEvent represents a DA event for caching type DAHeightEvent struct { Header *types.SignedHeader Data *types.Data // DaHeight corresponds to the highest DA included height between the Header and Data. DaHeight uint64 + // Source indicates where this event originated from (DA or P2P) + Source EventSource } diff --git a/block/internal/common/expected_interfaces.go b/block/internal/common/expected_interfaces.go new file mode 100644 index 000000000..c167e5da4 --- /dev/null +++ b/block/internal/common/expected_interfaces.go @@ -0,0 +1,13 @@ +package common + +import ( + "context" + + goheader "github.com/celestiaorg/go-header" +) + +// Broadcaster interface for handling P2P stores and broadcasting +type Broadcaster[H goheader.Header[H]] interface { + WriteToStoreAndBroadcast(ctx context.Context, payload H) error + Store() goheader.Store[H] +} diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 2876cb8c2..e3828911c 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -24,11 +24,6 @@ import ( "github.com/evstack/ev-node/types" ) -// broadcaster interface for P2P broadcasting -type broadcaster[T any] interface { - WriteToStoreAndBroadcast(ctx context.Context, payload T) error -} - // Executor handles block production, transaction processing, and state management type Executor struct { // Core components @@ -41,9 +36,9 @@ type Executor struct { cache cache.Manager metrics *common.Metrics - // Broadcasting - headerBroadcaster broadcaster[*types.SignedHeader] - dataBroadcaster broadcaster[*types.Data] + // P2P handling + headerBroadcaster common.Broadcaster[*types.SignedHeader] + dataBroadcaster common.Broadcaster[*types.Data] // Configuration config config.Config @@ -81,8 +76,8 @@ func NewExecutor( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerBroadcaster broadcaster[*types.SignedHeader], - dataBroadcaster broadcaster[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, diff --git a/block/internal/executing/executor_test.go b/block/internal/executing/executor_test.go index b5f9e2f47..bf8f752a5 100644 --- a/block/internal/executing/executor_test.go +++ b/block/internal/executing/executor_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + goheader "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" @@ -20,7 +21,7 @@ import ( ) // mockBroadcaster for testing -type mockBroadcaster[T any] struct { +type mockBroadcaster[T goheader.Header[T]] struct { called bool payload T } @@ -31,6 +32,10 @@ func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, paylo return nil } +func (m *mockBroadcaster[T]) Store() goheader.Store[T] { + panic("should not be needed") +} + func TestExecutor_BroadcasterIntegration(t *testing.T) { // Create in-memory store ds := sync.MutexWrap(datastore.NewMapDatastore()) diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index b57910151..08bc5d917 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -27,7 +27,6 @@ type DARetriever struct { da coreda.DA cache cache.Manager genesis genesis.Genesis - options common.BlockOptions logger zerolog.Logger // calculate namespaces bytes once and reuse them @@ -46,14 +45,12 @@ func NewDARetriever( cache cache.Manager, config config.Config, genesis genesis.Genesis, - options common.BlockOptions, logger zerolog.Logger, ) *DARetriever { return &DARetriever{ da: da, cache: cache, genesis: genesis, - options: options, logger: logger.With().Str("component", "da_retriever").Logger(), namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), @@ -210,6 +207,7 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight Header: header, Data: data, DaHeight: daHeight, + Source: common.SourceDA, } events = append(events, event) diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 22b27dcbc..e1fade293 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -59,7 +59,7 @@ func TestDARetriever_RetrieveFromDA_Invalid(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, errors.New("just invalid")).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) assert.Error(t, err) assert.Len(t, events, 0) @@ -77,7 +77,7 @@ func TestDARetriever_RetrieveFromDA_NotFound(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: whatever", coreda.ErrBlobNotFound.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) assert.True(t, errors.Is(err, coreda.ErrBlobNotFound)) assert.Len(t, events, 0) @@ -94,7 +94,7 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: later", coreda.ErrHeightFromFuture.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, derr := r.RetrieveFromDA(context.Background(), 1000) assert.Error(t, derr) assert.True(t, errors.Is(derr, coreda.ErrHeightFromFuture)) @@ -116,7 +116,7 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) { }). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) start := time.Now() events, err := r.RetrieveFromDA(context.Background(), 42) @@ -145,7 +145,7 @@ func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) events, err := r.RetrieveFromDA(context.Background(), 42) @@ -165,7 +165,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2) hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data) @@ -193,7 +193,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Header with no data hash present should trigger empty data creation (per current logic) hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil) @@ -221,7 +221,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil) gotH := r.tryDecodeHeader(hb, 123) @@ -257,7 +257,7 @@ func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) { goodAddr, pub, signer := buildSyncTestSigner(t) badAddr := []byte("not-the-proposer") gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: badAddr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Signed data is made by goodAddr; retriever expects badAddr -> should be rejected db, _ := makeSignedDataBytes(t, gen.ChainID, 7, goodAddr, pub, signer, 1) @@ -310,7 +310,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { mockDA.EXPECT().Get(mock.Anything, mock.Anything, mock.MatchedBy(func(ns []byte) bool { return bytes.Equal(ns, namespaceDataBz) })). Return([][]byte{dataBin}, nil).Once() - r := NewDARetriever(mockDA, cm, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(mockDA, cm, cfg, gen, zerolog.Nop()) events, derr := r.RetrieveFromDA(context.Background(), 1234) require.NoError(t, derr) @@ -328,7 +328,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Create header and data for the same block height but from different DA heights dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2) @@ -364,7 +364,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop()) + r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Create multiple headers and data for different block heights data3Bin, data3 := makeSignedDataBytes(t, gen.ChainID, 3, addr, pub, signer, 1) diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index cd369e44a..ed14ebd82 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -3,11 +3,14 @@ package syncing import ( "bytes" "context" + "errors" "fmt" + "time" goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" + "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/types" @@ -17,8 +20,8 @@ import ( type P2PHandler struct { headerStore goheader.Store[*types.SignedHeader] dataStore goheader.Store[*types.Data] + cache cache.Manager genesis genesis.Genesis - options common.BlockOptions logger zerolog.Logger } @@ -26,36 +29,44 @@ type P2PHandler struct { func NewP2PHandler( headerStore goheader.Store[*types.SignedHeader], dataStore goheader.Store[*types.Data], + cache cache.Manager, genesis genesis.Genesis, - options common.BlockOptions, logger zerolog.Logger, ) *P2PHandler { return &P2PHandler{ headerStore: headerStore, dataStore: dataStore, + cache: cache, genesis: genesis, - options: options, logger: logger.With().Str("component", "p2p_handler").Logger(), } } // ProcessHeaderRange processes headers from the header store within the given range -func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent { +func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { if startHeight > endHeight { - return nil + return } - var events []common.DAHeightEvent - for height := startHeight; height <= endHeight; height++ { select { case <-ctx.Done(): - return events + return default: } - header, err := h.headerStore.GetByHeight(ctx, height) + // Create a timeout context for each GetByHeight call to prevent blocking + timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + header, err := h.headerStore.GetByHeight(timeoutCtx, height) + cancel() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later") + // Don't continue processing further heights if we timeout on one + // This prevents blocking on sequential heights + return + } h.logger.Debug().Uint64("height", height).Err(err).Msg("failed to get header from store") continue } @@ -72,9 +83,18 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei // Create empty data for headers with empty data hash data = h.createEmptyDataForHeader(ctx, header) } else { - // Try to get data from data store - retrievedData, err := h.dataStore.GetByHeight(ctx, height) + // Try to get data from data store with timeout + timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + retrievedData, err := h.dataStore.GetByHeight(timeoutCtx, height) + cancel() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later") + // Don't continue processing if data is not available + // Store event with header only for later processing + continue + } h.logger.Debug().Uint64("height", height).Err(err).Msg("could not retrieve data for header from data store") continue } @@ -89,40 +109,59 @@ func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHei Header: header, Data: data, DaHeight: 0, // P2P events don't have DA height context + Source: common.SourceP2P, } - events = append(events, event) + select { + case heightInCh <- event: + default: + h.cache.SetPendingEvent(event.Header.Height(), &event) + } h.logger.Debug().Uint64("height", height).Str("source", "p2p_headers").Msg("processed header from P2P") } - - return events } // ProcessDataRange processes data from the data store within the given range -func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64) []common.DAHeightEvent { +func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) { if startHeight > endHeight { - return nil + return } - var events []common.DAHeightEvent - for height := startHeight; height <= endHeight; height++ { select { case <-ctx.Done(): - return events + return default: } - data, err := h.dataStore.GetByHeight(ctx, height) + // Create a timeout context for each GetByHeight call to prevent blocking + timeoutCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + data, err := h.dataStore.GetByHeight(timeoutCtx, height) + cancel() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + h.logger.Debug().Uint64("height", height).Msg("timeout waiting for data from store, will retry later") + // Don't continue processing further heights if we timeout on one + // This prevents blocking on sequential heights + return + } h.logger.Debug().Uint64("height", height).Err(err).Msg("failed to get data from store") continue } - // Get corresponding header - header, err := h.headerStore.GetByHeight(ctx, height) + // Get corresponding header with timeout + timeoutCtx, cancel = context.WithTimeout(ctx, 500*time.Millisecond) + header, err := h.headerStore.GetByHeight(timeoutCtx, height) + cancel() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + h.logger.Debug().Uint64("height", height).Msg("timeout waiting for header from store, will retry later") + // Don't continue processing if header is not available + continue + } h.logger.Debug().Uint64("height", height).Err(err).Msg("could not retrieve header for data from header store") continue } @@ -141,14 +180,17 @@ func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeigh Header: header, Data: data, DaHeight: 0, // P2P events don't have DA height context + Source: common.SourceP2P, } - events = append(events, event) + select { + case heightInCh <- event: + default: + h.cache.SetPendingEvent(event.Header.Height(), &event) + } h.logger.Debug().Uint64("height", height).Str("source", "p2p_data").Msg("processed data from P2P") } - - return events } // assertExpectedProposer validates the proposer address @@ -166,8 +208,12 @@ func (h *P2PHandler) createEmptyDataForHeader(ctx context.Context, header *types var lastDataHash types.Hash if headerHeight > 1 { - // Try to get previous data hash, but don't fail if not available - if prevData, err := h.dataStore.GetByHeight(ctx, headerHeight-1); err == nil && prevData != nil { + // Try to get previous data hash with a short timeout, but don't fail if not available + timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + prevData, err := h.dataStore.GetByHeight(timeoutCtx, headerHeight-1) + cancel() + + if err == nil && prevData != nil { lastDataHash = prevData.Hash() } else { h.logger.Debug().Uint64("current_height", headerHeight).Uint64("previous_height", headerHeight-1). diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index 11f24caca..2265b5397 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -7,14 +7,19 @@ import ( "testing" "time" + ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" + storemocks "github.com/evstack/ev-node/test/mocks" extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) @@ -56,13 +61,14 @@ type P2PTestData struct { Handler *P2PHandler HeaderStore *extmocks.MockStore[*types.SignedHeader] DataStore *extmocks.MockStore[*types.Data] + Cache cache.Manager Genesis genesis.Genesis ProposerAddr []byte ProposerPub crypto.PubKey Signer signerpkg.Signer } -// setupP2P constructs a P2PHandler with mocked go-header stores +// setupP2P constructs a P2PHandler with mocked go-header stores and real cache func setupP2P(t *testing.T) *P2PTestData { t.Helper() proposerAddr, proposerPub, signer := buildTestSigner(t) @@ -72,11 +78,28 @@ func setupP2P(t *testing.T) *P2PTestData { headerStoreMock := extmocks.NewMockStore[*types.SignedHeader](t) dataStoreMock := extmocks.NewMockStore[*types.Data](t) - handler := NewP2PHandler(headerStoreMock, dataStoreMock, gen, common.DefaultBlockOptions(), zerolog.Nop()) + // Create a real cache manager for tests + storeMock := storemocks.NewMockStore(t) + // Mock the methods that cache manager initialization will call + // Return ErrNotFound for non-existent metadata keys + storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-header-height").Return(nil, ds.ErrNotFound).Maybe() + storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-data-height").Return(nil, ds.ErrNotFound).Maybe() + storeMock.EXPECT().Height(mock.Anything).Return(uint64(0), nil).Maybe() + storeMock.EXPECT().SetMetadata(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + + cfg := config.Config{ + RootDir: t.TempDir(), + ClearCache: true, + } + cacheManager, err := cache.NewManager(cfg, storeMock, zerolog.Nop()) + require.NoError(t, err, "failed to create cache manager") + + handler := NewP2PHandler(headerStoreMock, dataStoreMock, cacheManager, gen, zerolog.Nop()) return &P2PTestData{ Handler: handler, HeaderStore: headerStoreMock, DataStore: dataStoreMock, + Cache: cacheManager, Genesis: gen, ProposerAddr: proposerAddr, ProposerPub: proposerPub, @@ -84,6 +107,29 @@ func setupP2P(t *testing.T) *P2PTestData { } } +// collectEvents reads events from a channel with a timeout +func collectEvents(t *testing.T, ch <-chan common.DAHeightEvent, timeout time.Duration) []common.DAHeightEvent { + t.Helper() + var events []common.DAHeightEvent + deadline := time.After(timeout) + for { + select { + case event := <-ch: + events = append(events, event) + case <-deadline: + return events + case <-time.After(10 * time.Millisecond): + // Give it a moment to check if more events are coming + select { + case event := <-ch: + events = append(events, event) + default: + return events + } + } + } +} + func TestP2PHandler_ProcessHeaderRange_HeaderAndDataHappyPath(t *testing.T) { p2pData := setupP2P(t) ctx := context.Background() @@ -104,10 +150,14 @@ func TestP2PHandler_ProcessHeaderRange_HeaderAndDataHappyPath(t *testing.T) { // Sanity: header should validate with data using default sync verifier require.NoError(t, signedHeader.ValidateBasicWithData(blockData), "header+data must validate before handler processes them") - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(signedHeader, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(blockData, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(signedHeader, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(blockData, nil).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 5) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 5, 5, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 1, "expected one event for the provided header/data height") require.Equal(t, uint64(5), events[0].Header.Height()) require.NotNil(t, events[0].Data) @@ -125,10 +175,14 @@ func TestP2PHandler_ProcessHeaderRange_MissingData_NonEmptyHash(t *testing.T) { blockData := makeData(p2pData.Genesis.ChainID, 7, 1) signedHeader.DataHash = blockData.DACommitment() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(7)).Return(signedHeader, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(7)).Return(nil, errors.New("not found")).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(signedHeader, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, errors.New("not found")).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 7, 7, ch) - events := p2pData.Handler.ProcessHeaderRange(ctx, 7, 7) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -137,10 +191,14 @@ func TestP2PHandler_ProcessDataRange_HeaderMissing(t *testing.T) { ctx := context.Background() blockData := makeData(p2pData.Genesis.ChainID, 9, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(blockData, nil).Once() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(nil, errors.New("no header")).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(blockData, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, errors.New("no header")).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessDataRange(ctx, 9, 9, ch) - events := p2pData.Handler.ProcessDataRange(ctx, 9, 9) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -154,9 +212,13 @@ func TestP2PHandler_ProposerMismatch_Rejected(t *testing.T) { signedHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 4, badAddr, pub, signer) signedHeader.DataHash = common.DataHashForEmptyTxs - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(4)).Return(signedHeader, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(4)).Return(signedHeader, nil).Once() - events := p2pData.Handler.ProcessHeaderRange(ctx, 4, 4) + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 4, 4, ch) + + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0) } @@ -170,7 +232,7 @@ func TestP2PHandler_CreateEmptyDataForHeader_UsesPreviousDataHash(t *testing.T) // Mock previous data at height 9 so handler can propagate its hash previousData := makeData(p2pData.Genesis.ChainID, 9, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(9)).Return(previousData, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(previousData, nil).Once() emptyData := p2pData.Handler.createEmptyDataForHeader(ctx, signedHeader) require.NotNil(t, emptyData, "handler should synthesize empty data when header declares empty data hash") @@ -189,7 +251,7 @@ func TestP2PHandler_CreateEmptyDataForHeader_NoPreviousData(t *testing.T) { signedHeader.DataHash = common.DataHashForEmptyTxs // Mock previous data fetch failure - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(1)).Return(nil, errors.New("not available")).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(1)).Return(nil, errors.New("not available")).Once() emptyData := p2pData.Handler.createEmptyDataForHeader(ctx, signedHeader) require.NotNil(t, emptyData, "handler should synthesize empty data even when previous data is unavailable") @@ -229,12 +291,16 @@ func TestP2PHandler_ProcessHeaderRange_MultipleHeightsHappyPath(t *testing.T) { require.NoError(t, header6.ValidateBasicWithData(data6), "header/data invalid for height 6") // Expectations for both heights - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(header5, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(5)).Return(data5, nil).Once() - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(6)).Return(header6, nil).Once() - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(6)).Return(data6, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(header5, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(data5, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header6, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data6, nil).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessHeaderRange(ctx, 5, 6, ch) - events := p2pData.Handler.ProcessHeaderRange(ctx, 5, 6) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 2, "expected two events for heights 5 and 6") require.Equal(t, uint64(5), events[0].Header.Height(), "first event should be height 5") require.Equal(t, uint64(6), events[1].Header.Height(), "second event should be height 6") @@ -248,14 +314,18 @@ func TestP2PHandler_ProcessDataRange_HeaderValidateHeaderFails(t *testing.T) { // Data exists at height 3 blockData := makeData(p2pData.Genesis.ChainID, 3, 1) - p2pData.DataStore.EXPECT().GetByHeight(ctx, uint64(3)).Return(blockData, nil).Once() + p2pData.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(3)).Return(blockData, nil).Once() // Header proposer does not match genesis -> validateHeader should fail badAddr, pub, signer := buildTestSigner(t) require.NotEqual(t, string(p2pData.Genesis.ProposerAddress), string(badAddr), "negative test requires mismatched proposer") badHeader := p2pMakeSignedHeader(t, p2pData.Genesis.ChainID, 3, badAddr, pub, signer) - p2pData.HeaderStore.EXPECT().GetByHeight(ctx, uint64(3)).Return(badHeader, nil).Once() + p2pData.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(3)).Return(badHeader, nil).Once() + + // Create channel and process + ch := make(chan common.DAHeightEvent, 10) + p2pData.Handler.ProcessDataRange(ctx, 3, 3, ch) - events := p2pData.Handler.ProcessDataRange(ctx, 3, 3) + events := collectEvents(t, ch, 100*time.Millisecond) require.Len(t, events, 0, "validateHeader failure should drop event") } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 4e64188b0..ae64827d9 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -9,8 +9,8 @@ import ( "sync/atomic" "time" - goheader "github.com/celestiaorg/go-header" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" @@ -25,9 +25,10 @@ import ( type daRetriever interface { RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) } + type p2pHandler interface { - ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent - ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent + ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) + ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64, heightInCh chan<- common.DAHeightEvent) } // Syncer handles block synchronization from DA and P2P sources. @@ -52,9 +53,9 @@ type Syncer struct { // DA state daHeight *atomic.Uint64 - // P2P stores - headerStore goheader.Store[*types.SignedHeader] - dataStore goheader.Store[*types.Data] + // P2P handling + headerBroadcaster common.Broadcaster[*types.SignedHeader] + dataBroadcaster common.Broadcaster[*types.Data] // Channels for coordination heightInCh chan common.DAHeightEvent @@ -82,28 +83,28 @@ func NewSyncer( metrics *common.Metrics, config config.Config, genesis genesis.Genesis, - headerStore goheader.Store[*types.SignedHeader], - dataStore goheader.Store[*types.Data], + headerBroadcaster common.Broadcaster[*types.SignedHeader], + dataBroadcaster common.Broadcaster[*types.Data], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, ) *Syncer { return &Syncer{ - store: store, - exec: exec, - da: da, - cache: cache, - metrics: metrics, - config: config, - genesis: genesis, - options: options, - headerStore: headerStore, - dataStore: dataStore, - lastState: &atomic.Pointer[types.State]{}, - daHeight: &atomic.Uint64{}, - heightInCh: make(chan common.DAHeightEvent, 10_000), - errorCh: errorCh, - logger: logger.With().Str("component", "syncer").Logger(), + store: store, + exec: exec, + da: da, + cache: cache, + metrics: metrics, + config: config, + genesis: genesis, + options: options, + headerBroadcaster: headerBroadcaster, + dataBroadcaster: dataBroadcaster, + lastState: &atomic.Pointer[types.State]{}, + daHeight: &atomic.Uint64{}, + heightInCh: make(chan common.DAHeightEvent, 10_000), + errorCh: errorCh, + logger: logger.With().Str("component", "syncer").Logger(), } } @@ -117,8 +118,8 @@ func (s *Syncer) Start(ctx context.Context) error { } // Initialize handlers - s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger) - s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.genesis, s.options, s.logger) + s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger) + s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.cache, s.genesis, s.logger) // Start main processing loop s.wg.Add(1) @@ -236,50 +237,50 @@ func (s *Syncer) syncLoop() { } } - initialHeight, err := s.store.Height(s.ctx) - if err != nil { - s.logger.Error().Err(err).Msg("failed to get initial height") - return - } - - lastHeaderHeight := &initialHeight - lastDataHeight := &initialHeight - // Backoff control when DA replies with errors nextDARequestAt := &time.Time{} - blockTicker := time.NewTicker(s.config.Node.BlockTime.Duration) - defer blockTicker.Stop() - for { + wg := sync.WaitGroup{} + select { case <-s.ctx.Done(): return default: } - // Process pending events from cache on every iteration - s.processPendingEvents() - - fetchedP2pEvent := s.tryFetchFromP2P(lastHeaderHeight, lastDataHeight, blockTicker.C) - fetchedDaEvent := s.tryFetchFromDA(nextDARequestAt) - - // Prevent busy-waiting when no events are available - if !fetchedDaEvent && !fetchedP2pEvent { - time.Sleep(min(10*time.Millisecond, s.config.Node.BlockTime.Duration)) - } + wg.Add(1) + go func() { + defer wg.Done() + s.processPendingEvents() + }() + + wg.Add(1) + go func() { + defer wg.Done() + s.tryFetchFromP2P() + }() + + wg.Add(1) + go func() { + defer wg.Done() + s.tryFetchFromDA(nextDARequestAt) + }() + + // wait for pending events processing, p2p and da fetching + wg.Wait() } } // tryFetchFromDA attempts to fetch events from the DA layer. // It handles backoff timing, DA height management, and error classification. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { +func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) { now := time.Now() daHeight := s.GetDAHeight() // Respect backoff window if set if !nextDARequestAt.IsZero() && now.Before(*nextDARequestAt) { - return false + return } // Retrieve from DA as fast as possible (unless throttled by HFF) @@ -291,7 +292,7 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { s.SetDAHeight(daHeight + 1) // Reset backoff on success *nextDARequestAt = time.Time{} - return false + return } // Back off exactly by DA block time to avoid overloading @@ -302,8 +303,7 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { *nextDARequestAt = now.Add(backoffDelay) s.logger.Error().Err(err).Dur("delay", backoffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests") - - return false + return } // Reset backoff on success @@ -320,57 +320,29 @@ func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) bool { // increment DA height on successful retrieval s.SetDAHeight(daHeight + 1) - return len(events) > 0 } // tryFetchFromP2P attempts to fetch events from P2P stores. // It processes both header and data ranges when the block ticker fires. // Returns true if any events were successfully processed. -func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, blockTicker <-chan time.Time) bool { - eventsProcessed := false - - select { - case <-blockTicker: - // Process headers - newHeaderHeight := s.headerStore.Height() - if newHeaderHeight > *lastHeaderHeight { - events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - *lastHeaderHeight = newHeaderHeight - if len(events) > 0 { - eventsProcessed = true - } - } +func (s *Syncer) tryFetchFromP2P() { + currentHeight, err := s.store.Height(s.ctx) + if err != nil { + s.logger.Error().Err(err).Msg("failed to get current height") + return + } - // Process data - newDataHeight := s.dataStore.Height() - if newDataHeight == newHeaderHeight { - *lastDataHeight = newDataHeight - } else if newDataHeight > *lastDataHeight { - events := s.p2pHandler.ProcessDataRange(s.ctx, *lastDataHeight+1, newDataHeight) - for _, event := range events { - select { - case s.heightInCh <- event: - default: - s.cache.SetPendingEvent(event.Header.Height(), &event) - } - } - *lastDataHeight = newDataHeight - if len(events) > 0 { - eventsProcessed = true - } - } - default: - // No P2P events available + // Process headers + newHeaderHeight := s.headerBroadcaster.Store().Height() + if newHeaderHeight > currentHeight { + s.p2pHandler.ProcessHeaderRange(s.ctx, currentHeight+1, newHeaderHeight, s.heightInCh) } - return eventsProcessed + // Process data (if not already processed by headers) + newDataHeight := s.dataBroadcaster.Store().Height() + if newDataHeight != newHeaderHeight && newDataHeight > currentHeight { + s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight, s.heightInCh) + } } func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { @@ -412,6 +384,16 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { } return } + + // only save to p2p stores if the event came from DA + if event.Source == common.SourceDA { + g, ctx := errgroup.WithContext(s.ctx) + g.Go(func() error { return s.headerBroadcaster.WriteToStoreAndBroadcast(ctx, event.Header) }) + g.Go(func() error { return s.dataBroadcaster.WriteToStoreAndBroadcast(ctx, event.Data) }) + if err := g.Wait(); err != nil { + s.logger.Error().Err(err).Msg("failed to append event header and/or data to p2p store") + } + } } // errInvalidBlock is returned when a block is failing validation @@ -606,6 +588,7 @@ func (s *Syncer) processPendingEvents() { Header: event.Header, Data: event.Data, DaHeight: event.DaHeight, + Source: event.Source, } select { diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 2dc2bd804..ecaad863b 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + goheader "github.com/celestiaorg/go-header" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" @@ -24,6 +25,19 @@ import ( "github.com/evstack/ev-node/types" ) +// mockBroadcaster for testing +type mockBroadcaster[T goheader.Header[T]] struct { + store goheader.Store[T] +} + +func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error { + return nil +} + +func (m *mockBroadcaster[T]) Store() goheader.Store[T] { + return m.store +} + // TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff // behavior when encountering different types of DA layer errors. func TestSyncer_BackoffOnDAError(t *testing.T) { @@ -76,11 +90,11 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerStore = headerStore + syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataStore = dataStore + syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} var callTimes []time.Time callCount := 0 @@ -167,11 +181,11 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerStore = headerStore + syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataStore = dataStore + syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} var callTimes []time.Time @@ -253,11 +267,11 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { headerStore := mocks.NewMockStore[*types.SignedHeader](t) headerStore.On("Height").Return(uint64(0)).Maybe() - syncer.headerStore = headerStore + syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore} dataStore := mocks.NewMockStore[*types.Data](t) dataStore.On("Height").Return(uint64(0)).Maybe() - syncer.dataStore = dataStore + syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore} var callTimes []time.Time @@ -335,8 +349,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 28cf2af22..355187cee 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -108,8 +108,8 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay common.NopMetrics(), cfg, gen, - nil, // headerStore not used; we inject P2P directly to channel when needed - nil, // dataStore not used + nil, // we inject P2P directly to channel when needed + nil, // injected when needed zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -152,9 +152,9 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay s.p2pHandler = newMockp2pHandler(b) // not used directly in this benchmark path headerP2PStore := mocks.NewMockStore[*types.SignedHeader](b) headerP2PStore.On("Height").Return(uint64(0)).Maybe() - s.headerStore = headerP2PStore + s.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerP2PStore} dataP2PStore := mocks.NewMockStore[*types.Data](b) dataP2PStore.On("Height").Return(uint64(0)).Maybe() - s.dataStore = dataP2PStore + s.dataBroadcaster = &mockBroadcaster[*types.Data]{dataP2PStore} return &benchFixture{s: s, st: st, cm: cm, cancel: cancel} } diff --git a/block/internal/syncing/syncer_mock.go b/block/internal/syncing/syncer_mock.go index 2413db59c..85cad4696 100644 --- a/block/internal/syncing/syncer_mock.go +++ b/block/internal/syncing/syncer_mock.go @@ -134,22 +134,9 @@ func (_m *mockp2pHandler) EXPECT() *mockp2pHandler_Expecter { } // ProcessDataRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessDataRange(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent { - ret := _mock.Called(ctx, fromHeight, toHeight) - - if len(ret) == 0 { - panic("no return value specified for ProcessDataRange") - } - - var r0 []common.DAHeightEvent - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) []common.DAHeightEvent); ok { - r0 = returnFunc(ctx, fromHeight, toHeight) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.DAHeightEvent) - } - } - return r0 +func (_mock *mockp2pHandler) ProcessDataRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { + _mock.Called(ctx, fromHeight, toHeight, heightInCh) + return } // mockp2pHandler_ProcessDataRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessDataRange' @@ -161,11 +148,12 @@ type mockp2pHandler_ProcessDataRange_Call struct { // - ctx context.Context // - fromHeight uint64 // - toHeight uint64 -func (_e *mockp2pHandler_Expecter) ProcessDataRange(ctx interface{}, fromHeight interface{}, toHeight interface{}) *mockp2pHandler_ProcessDataRange_Call { - return &mockp2pHandler_ProcessDataRange_Call{Call: _e.mock.On("ProcessDataRange", ctx, fromHeight, toHeight)} +// - heightInCh chan<- common.DAHeightEvent +func (_e *mockp2pHandler_Expecter) ProcessDataRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessDataRange_Call { + return &mockp2pHandler_ProcessDataRange_Call{Call: _e.mock.On("ProcessDataRange", ctx, fromHeight, toHeight, heightInCh)} } -func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64)) *mockp2pHandler_ProcessDataRange_Call { +func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -179,42 +167,34 @@ func (_c *mockp2pHandler_ProcessDataRange_Call) Run(run func(ctx context.Context if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 chan<- common.DAHeightEvent + if args[3] != nil { + arg3 = args[3].(chan<- common.DAHeightEvent) + } run( arg0, arg1, arg2, + arg3, ) }) return _c } -func (_c *mockp2pHandler_ProcessDataRange_Call) Return(dAHeightEvents []common.DAHeightEvent) *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Return(dAHeightEvents) +func (_c *mockp2pHandler_ProcessDataRange_Call) Return() *mockp2pHandler_ProcessDataRange_Call { + _c.Call.Return() return _c } -func (_c *mockp2pHandler_ProcessDataRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent) *mockp2pHandler_ProcessDataRange_Call { - _c.Call.Return(run) +func (_c *mockp2pHandler_ProcessDataRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessDataRange_Call { + _c.Run(run) return _c } // ProcessHeaderRange provides a mock function for the type mockp2pHandler -func (_mock *mockp2pHandler) ProcessHeaderRange(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent { - ret := _mock.Called(ctx, fromHeight, toHeight) - - if len(ret) == 0 { - panic("no return value specified for ProcessHeaderRange") - } - - var r0 []common.DAHeightEvent - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) []common.DAHeightEvent); ok { - r0 = returnFunc(ctx, fromHeight, toHeight) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.DAHeightEvent) - } - } - return r0 +func (_mock *mockp2pHandler) ProcessHeaderRange(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent) { + _mock.Called(ctx, fromHeight, toHeight, heightInCh) + return } // mockp2pHandler_ProcessHeaderRange_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessHeaderRange' @@ -226,11 +206,12 @@ type mockp2pHandler_ProcessHeaderRange_Call struct { // - ctx context.Context // - fromHeight uint64 // - toHeight uint64 -func (_e *mockp2pHandler_Expecter) ProcessHeaderRange(ctx interface{}, fromHeight interface{}, toHeight interface{}) *mockp2pHandler_ProcessHeaderRange_Call { - return &mockp2pHandler_ProcessHeaderRange_Call{Call: _e.mock.On("ProcessHeaderRange", ctx, fromHeight, toHeight)} +// - heightInCh chan<- common.DAHeightEvent +func (_e *mockp2pHandler_Expecter) ProcessHeaderRange(ctx interface{}, fromHeight interface{}, toHeight interface{}, heightInCh interface{}) *mockp2pHandler_ProcessHeaderRange_Call { + return &mockp2pHandler_ProcessHeaderRange_Call{Call: _e.mock.On("ProcessHeaderRange", ctx, fromHeight, toHeight, heightInCh)} } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64)) *mockp2pHandler_ProcessHeaderRange_Call { +func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -244,21 +225,26 @@ func (_c *mockp2pHandler_ProcessHeaderRange_Call) Run(run func(ctx context.Conte if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 chan<- common.DAHeightEvent + if args[3] != nil { + arg3 = args[3].(chan<- common.DAHeightEvent) + } run( arg0, arg1, arg2, + arg3, ) }) return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) Return(dAHeightEvents []common.DAHeightEvent) *mockp2pHandler_ProcessHeaderRange_Call { - _c.Call.Return(dAHeightEvents) +func (_c *mockp2pHandler_ProcessHeaderRange_Call) Return() *mockp2pHandler_ProcessHeaderRange_Call { + _c.Call.Return() return _c } -func (_c *mockp2pHandler_ProcessHeaderRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64) []common.DAHeightEvent) *mockp2pHandler_ProcessHeaderRange_Call { - _c.Call.Return(run) +func (_c *mockp2pHandler_ProcessHeaderRange_Call) RunAndReturn(run func(ctx context.Context, fromHeight uint64, toHeight uint64, heightInCh chan<- common.DAHeightEvent)) *mockp2pHandler_ProcessHeaderRange_Call { + _c.Run(run) return _c } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 0fe7d5192..a326950bd 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -13,6 +13,7 @@ import ( signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" testmocks "github.com/evstack/ev-node/test/mocks" + mocks "github.com/evstack/ev-node/test/mocks/external" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" @@ -107,8 +108,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -155,8 +156,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -205,8 +206,8 @@ func TestSequentialBlockSync(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{}, + &mockBroadcaster[*types.Data]{}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -323,6 +324,10 @@ func TestSyncLoopPersistState(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr, DAStartHeight: myDAHeightOffset} dummyExec := execution.NewDummyExecutor() + mockP2PHeaderStore := &mocks.MockStore[*types.SignedHeader]{} + mockP2PDataStore := &mocks.MockStore[*types.Data]{} + mockP2PHeaderStore.On("Height", mock.Anything).Return(uint64(1), nil).Maybe() + mockP2PDataStore.On("Height", mock.Anything).Return(uint64(1), nil).Maybe() syncerInst1 := NewSyncer( st, @@ -332,8 +337,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{mockP2PHeaderStore}, + &mockBroadcaster[*types.Data]{mockP2PDataStore}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -343,6 +348,8 @@ func TestSyncLoopPersistState(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) syncerInst1.ctx = ctx daRtrMock, p2pHndlMock := newMockdaRetriever(t), newMockp2pHandler(t) + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock // with n da blobs fetched @@ -409,8 +416,8 @@ func TestSyncLoopPersistState(t *testing.T) { common.NopMetrics(), cfg, gen, - nil, - nil, + &mockBroadcaster[*types.SignedHeader]{mockP2PHeaderStore}, + &mockBroadcaster[*types.Data]{mockP2PDataStore}, zerolog.Nop(), common.DefaultBlockOptions(), make(chan error, 1), @@ -422,6 +429,8 @@ func TestSyncLoopPersistState(t *testing.T) { t.Cleanup(cancel) syncerInst2.ctx = ctx daRtrMock, p2pHndlMock = newMockdaRetriever(t), newMockp2pHandler(t) + p2pHndlMock.On("ProcessHeaderRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + p2pHndlMock.On("ProcessDataRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Maybe() syncerInst2.daRetriever, syncerInst2.p2pHandler = daRtrMock, p2pHndlMock daRtrMock.On("RetrieveFromDA", mock.Anything, mock.Anything). diff --git a/node/full.go b/node/full.go index c1704355d..947c16478 100644 --- a/node/full.go +++ b/node/full.go @@ -120,8 +120,8 @@ func newFullNode( rktStore, exec, da, - headerSyncService.Store(), - dataSyncService.Store(), + headerSyncService, + dataSyncService, logger, blockMetrics, nodeOpts.BlockOptions, diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 6af6e40ab..ccaa80453 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -117,7 +117,7 @@ func newSyncService[H header.Header[H]]( } // Store returns the store of the SyncService -func (syncService *SyncService[H]) Store() *goheaderstore.Store[H] { +func (syncService *SyncService[H]) Store() header.Store[H] { return syncService.store } @@ -420,6 +420,7 @@ func newSyncer[H header.Header[H]]( opts = append(opts, goheadersync.WithMetrics(), goheadersync.WithPruningWindow(ninetyNineYears), + goheadersync.WithTrustingPeriod(ninetyNineYears), ) return goheadersync.NewSyncer(ex, store, sub, opts...) }