Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"errors"
"fmt"

goheader "github.com/celestiaorg/go-header"
"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/executing"
"github.com/evstack/ev-node/block/internal/reaping"
"github.com/evstack/ev-node/block/internal/submitting"
Expand Down Expand Up @@ -122,11 +122,6 @@ func (bc *Components) Stop() error {
return errs
}

// broadcaster interface for P2P broadcasting
type broadcaster[T any] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
}

// NewSyncComponents creates components for a non-aggregator full node that can only sync blocks.
// Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA.
// They have more sync capabilities than light nodes but no block production. No signer required.
Expand All @@ -136,8 +131,8 @@ func NewSyncComponents(
store store.Store,
exec coreexecutor.Executor,
da coreda.DA,
headerStore goheader.Store[*types.SignedHeader],
dataStore goheader.Store[*types.Data],
headerBroadcaster common.Broadcaster[*types.SignedHeader],
dataBroadcaster common.Broadcaster[*types.Data],
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
Expand All @@ -158,8 +153,8 @@ func NewSyncComponents(
metrics,
config,
genesis,
headerStore,
dataStore,
headerBroadcaster,
dataBroadcaster,
logger,
blockOpts,
errorCh,
Expand Down Expand Up @@ -199,8 +194,8 @@ func NewAggregatorComponents(
sequencer coresequencer.Sequencer,
da coreda.DA,
signer signer.Signer,
headerBroadcaster broadcaster[*types.SignedHeader],
dataBroadcaster broadcaster[*types.Data],
headerBroadcaster common.Broadcaster[*types.SignedHeader],
dataBroadcaster common.Broadcaster[*types.Data],
logger zerolog.Logger,
metrics *Metrics,
blockOpts BlockOptions,
Expand Down
12 changes: 12 additions & 0 deletions block/internal/common/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@ package common

import "github.com/evstack/ev-node/types"

// EventSource represents the origin of a block event
type EventSource string

const (
// SourceDA indicates the event came from the DA layer
SourceDA EventSource = "DA"
// SourceP2P indicates the event came from P2P network
SourceP2P EventSource = "P2P"
)

// DAHeightEvent represents a DA event for caching
type DAHeightEvent struct {
Header *types.SignedHeader
Data *types.Data
// DaHeight corresponds to the highest DA included height between the Header and Data.
DaHeight uint64
// Source indicates where this event originated from (DA or P2P)
Source EventSource
}
13 changes: 13 additions & 0 deletions block/internal/common/expected_interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common

import (
"context"

goheader "github.com/celestiaorg/go-header"
)

// Broadcaster interface for handling P2P stores and broadcasting
type Broadcaster[H goheader.Header[H]] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload H) error
Store() goheader.Store[H]
}
15 changes: 5 additions & 10 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ import (
"github.com/evstack/ev-node/types"
)

// broadcaster interface for P2P broadcasting
type broadcaster[T any] interface {
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
}

// Executor handles block production, transaction processing, and state management
type Executor struct {
// Core components
Expand All @@ -41,9 +36,9 @@ type Executor struct {
cache cache.Manager
metrics *common.Metrics

// Broadcasting
headerBroadcaster broadcaster[*types.SignedHeader]
dataBroadcaster broadcaster[*types.Data]
// P2P handling
headerBroadcaster common.Broadcaster[*types.SignedHeader]
dataBroadcaster common.Broadcaster[*types.Data]

// Configuration
config config.Config
Expand Down Expand Up @@ -81,8 +76,8 @@ func NewExecutor(
metrics *common.Metrics,
config config.Config,
genesis genesis.Genesis,
headerBroadcaster broadcaster[*types.SignedHeader],
dataBroadcaster broadcaster[*types.Data],
headerBroadcaster common.Broadcaster[*types.SignedHeader],
dataBroadcaster common.Broadcaster[*types.Data],
logger zerolog.Logger,
options common.BlockOptions,
errorCh chan<- error,
Expand Down
7 changes: 6 additions & 1 deletion block/internal/executing/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

goheader "github.com/celestiaorg/go-header"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/rs/zerolog"
Expand All @@ -20,7 +21,7 @@ import (
)

// mockBroadcaster for testing
type mockBroadcaster[T any] struct {
type mockBroadcaster[T goheader.Header[T]] struct {
called bool
payload T
}
Expand All @@ -31,6 +32,10 @@ func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, paylo
return nil
}

func (m *mockBroadcaster[T]) Store() goheader.Store[T] {
panic("should not be needed")
}

func TestExecutor_BroadcasterIntegration(t *testing.T) {
// Create in-memory store
ds := sync.MutexWrap(datastore.NewMapDatastore())
Expand Down
4 changes: 1 addition & 3 deletions block/internal/syncing/da_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type DARetriever struct {
da coreda.DA
cache cache.Manager
genesis genesis.Genesis
options common.BlockOptions
logger zerolog.Logger

// calculate namespaces bytes once and reuse them
Expand All @@ -46,14 +45,12 @@ func NewDARetriever(
cache cache.Manager,
config config.Config,
genesis genesis.Genesis,
options common.BlockOptions,
logger zerolog.Logger,
) *DARetriever {
return &DARetriever{
da: da,
cache: cache,
genesis: genesis,
options: options,
logger: logger.With().Str("component", "da_retriever").Logger(),
namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
Expand Down Expand Up @@ -210,6 +207,7 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
Header: header,
Data: data,
DaHeight: daHeight,
Source: common.SourceDA,
}

events = append(events, event)
Expand Down
24 changes: 12 additions & 12 deletions block/internal/syncing/da_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestDARetriever_RetrieveFromDA_Invalid(t *testing.T) {
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("just invalid")).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())
events, err := r.RetrieveFromDA(context.Background(), 42)
assert.Error(t, err)
assert.Len(t, events, 0)
Expand All @@ -77,7 +77,7 @@ func TestDARetriever_RetrieveFromDA_NotFound(t *testing.T) {
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("%s: whatever", coreda.ErrBlobNotFound.Error())).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())
events, err := r.RetrieveFromDA(context.Background(), 42)
assert.True(t, errors.Is(err, coreda.ErrBlobNotFound))
assert.Len(t, events, 0)
Expand All @@ -94,7 +94,7 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) {
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("%s: later", coreda.ErrHeightFromFuture.Error())).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())
events, derr := r.RetrieveFromDA(context.Background(), 1000)
assert.Error(t, derr)
assert.True(t, errors.Is(derr, coreda.ErrHeightFromFuture))
Expand All @@ -116,7 +116,7 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) {
}).
Return(nil, context.DeadlineExceeded).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())

start := time.Now()
events, err := r.RetrieveFromDA(context.Background(), 42)
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) {
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
Return(nil, context.DeadlineExceeded).Maybe()

r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop())

events, err := r.RetrieveFromDA(context.Background(), 42)

Expand All @@ -165,7 +165,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}

r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data)
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {

addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Header with no data hash present should trigger empty data creation (per current logic)
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil)
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) {

addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil)
gotH := r.tryDecodeHeader(hb, 123)
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) {
goodAddr, pub, signer := buildSyncTestSigner(t)
badAddr := []byte("not-the-proposer")
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: badAddr}
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Signed data is made by goodAddr; retriever expects badAddr -> should be rejected
db, _ := makeSignedDataBytes(t, gen.ChainID, 7, goodAddr, pub, signer, 1)
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) {
mockDA.EXPECT().Get(mock.Anything, mock.Anything, mock.MatchedBy(func(ns []byte) bool { return bytes.Equal(ns, namespaceDataBz) })).
Return([][]byte{dataBin}, nil).Once()

r := NewDARetriever(mockDA, cm, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(mockDA, cm, cfg, gen, zerolog.Nop())

events, derr := r.RetrieveFromDA(context.Background(), 1234)
require.NoError(t, derr)
Expand All @@ -328,7 +328,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {
addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}

r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Create header and data for the same block height but from different DA heights
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2)
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}

r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, common.DefaultBlockOptions(), zerolog.Nop())
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Create multiple headers and data for different block heights
data3Bin, data3 := makeSignedDataBytes(t, gen.ChainID, 3, addr, pub, signer, 1)
Expand Down
Loading
Loading