Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -50,8 +51,7 @@ type Executor struct {
options common.BlockOptions

// State management
lastState types.State
lastStateMtx *sync.RWMutex
lastState *atomic.Pointer[types.State]

// Channels for coordination
txNotifyCh chan struct{}
Expand Down Expand Up @@ -112,7 +112,7 @@ func NewExecutor(
headerBroadcaster: headerBroadcaster,
dataBroadcaster: dataBroadcaster,
options: options,
lastStateMtx: &sync.RWMutex{},
lastState: &atomic.Pointer[types.State]{},
txNotifyCh: make(chan struct{}, 1),
errorCh: errorCh,
logger: logger.With().Str("component", "executor").Logger(),
Expand Down Expand Up @@ -150,18 +150,29 @@ func (e *Executor) Stop() error {
return nil
}

// GetLastState returns the current state
// GetLastState returns the current state.
func (e *Executor) GetLastState() types.State {
e.lastStateMtx.RLock()
defer e.lastStateMtx.RUnlock()
return e.lastState
state := e.getLastState()
state.AppHash = bytes.Clone(state.AppHash)
state.LastResultsHash = bytes.Clone(state.LastResultsHash)

return state
}

// getLastState returns the current state.
// getLastState should never directly mutate.
func (e *Executor) getLastState() types.State {
state := e.lastState.Load()
if state == nil {
return types.State{}
}

return *state
}

// SetLastState updates the current state
func (e *Executor) SetLastState(state types.State) {
e.lastStateMtx.Lock()
defer e.lastStateMtx.Unlock()
e.lastState = state
// setLastState updates the current state
func (e *Executor) setLastState(state types.State) {
e.lastState.Store(&state)
}

// NotifyNewTransactions signals that new transactions are available
Expand Down Expand Up @@ -198,7 +209,7 @@ func (e *Executor) initializeState() error {
}
}

e.SetLastState(state)
e.setLastState(state)

// Set store height
if err := e.store.SetHeight(e.ctx, state.LastBlockHeight); err != nil {
Expand All @@ -218,7 +229,7 @@ func (e *Executor) executionLoop() {

var delay time.Duration
initialHeight := e.genesis.InitialHeight
currentState := e.GetLastState()
currentState := e.getLastState()

if currentState.LastBlockHeight < initialHeight {
delay = time.Until(e.genesis.StartTime.Add(e.config.Node.BlockTime.Duration))
Expand Down Expand Up @@ -291,7 +302,7 @@ func (e *Executor) produceBlock() error {
}
}()

currentState := e.GetLastState()
currentState := e.getLastState()
newHeight := currentState.LastBlockHeight + 1

e.logger.Debug().Uint64("height", newHeight).Msg("producing block")
Expand Down Expand Up @@ -429,7 +440,7 @@ func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) {

// createBlock creates a new block from the given batch
func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
currentState := e.GetLastState()
currentState := e.getLastState()
headerTime := uint64(e.genesis.StartTime.UnixNano())

// Get last block info
Expand Down Expand Up @@ -518,7 +529,7 @@ func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *Ba

// applyBlock applies the block to get the new state
func (e *Executor) applyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) {
currentState := e.GetLastState()
currentState := e.getLastState()

// Prepare transactions
rawTxs := make([][]byte, len(data.Txs))
Expand Down Expand Up @@ -601,7 +612,7 @@ func (e *Executor) updateState(ctx context.Context, newState types.State) error
return err
}

e.SetLastState(newState)
e.setLastState(newState)
e.metrics.Height.Set(float64(newState.LastBlockHeight))

return nil
Expand Down
8 changes: 4 additions & 4 deletions block/internal/executing/executor_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {

// Now simulate creating a pending block at height 2 but not fully completing it
// This simulates a crash scenario where block data is saved but state isn't updated
currentState := exec1.GetLastState()
currentState := exec1.getLastState()
newHeight := currentState.LastBlockHeight + 1 // height 2

// Get validator hash properly
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
defer exec2.cancel()

// Verify that the state is at height 1 (pending block at height 2 wasn't committed)
currentState2 := exec2.GetLastState()
currentState2 := exec2.getLastState()
assert.Equal(t, uint64(1), currentState2.LastBlockHeight)

// When second executor tries to produce block at height 2, it should use pending data
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
assert.True(t, db2.called, "data broadcaster should be called")

// Verify the executor state was updated correctly
finalState := exec2.GetLastState()
finalState := exec2.getLastState()
assert.Equal(t, uint64(2), finalState.LastBlockHeight)
assert.Equal(t, []byte("new_root_2"), finalState.AppHash)

Expand Down Expand Up @@ -333,7 +333,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
defer exec2.cancel()

// Verify state loaded correctly
state := exec2.GetLastState()
state := exec2.getLastState()
assert.Equal(t, uint64(1), state.LastBlockHeight)

// Now produce next block - should go through normal sequencer flow since no pending block
Expand Down
34 changes: 15 additions & 19 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -45,8 +46,7 @@ type Submitter struct {
signer signer.Signer

// DA state
daIncludedHeight uint64
daStateMtx *sync.RWMutex
daIncludedHeight *atomic.Uint64

// Submission state to prevent concurrent submissions
headerSubmissionMtx sync.Mutex
Expand Down Expand Up @@ -78,17 +78,17 @@ func NewSubmitter(
errorCh chan<- error,
) *Submitter {
return &Submitter{
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
daSubmitter: daSubmitter,
signer: signer,
daStateMtx: &sync.RWMutex{},
errorCh: errorCh,
logger: logger.With().Str("component", "submitter").Logger(),
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
daSubmitter: daSubmitter,
signer: signer,
daIncludedHeight: &atomic.Uint64{},
errorCh: errorCh,
logger: logger.With().Str("component", "submitter").Logger(),
}
}

Expand Down Expand Up @@ -269,16 +269,12 @@ func (s *Submitter) setFinalWithRetry(nextHeight uint64) error {

// GetDAIncludedHeight returns the DA included height
func (s *Submitter) GetDAIncludedHeight() uint64 {
s.daStateMtx.RLock()
defer s.daStateMtx.RUnlock()
return s.daIncludedHeight
return s.daIncludedHeight.Load()
}

// SetDAIncludedHeight updates the DA included height
func (s *Submitter) SetDAIncludedHeight(height uint64) {
s.daStateMtx.Lock()
defer s.daStateMtx.Unlock()
s.daIncludedHeight = height
s.daIncludedHeight.Store(height)
}

// initializeDAIncludedHeight loads the DA included height from store
Expand Down
24 changes: 12 additions & 12 deletions block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -210,7 +210,7 @@ func TestSubmitter_initializeDAIncludedHeight(t *testing.T) {
binary.LittleEndian.PutUint64(bz, 7)
require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz))

s := &Submitter{store: st, daStateMtx: &sync.RWMutex{}, logger: zerolog.Nop()}
s := &Submitter{store: st, daIncludedHeight: &atomic.Uint64{}, logger: zerolog.Nop()}
require.NoError(t, s.initializeDAIncludedHeight(ctx))
assert.Equal(t, uint64(7), s.GetDAIncludedHeight())
}
Expand Down Expand Up @@ -324,16 +324,16 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) {

// Provide a minimal signer implementation
s := &Submitter{
store: st,
exec: exec,
cache: cm,
metrics: metrics,
config: cfg,
genesis: genesis.Genesis{},
daSubmitter: fakeDA,
signer: &fakeSigner{},
daStateMtx: &sync.RWMutex{},
logger: zerolog.Nop(),
store: st,
exec: exec,
cache: cm,
metrics: metrics,
config: cfg,
genesis: genesis.Genesis{},
daSubmitter: fakeDA,
signer: &fakeSigner{},
daIncludedHeight: &atomic.Uint64{},
logger: zerolog.Nop(),
}

// Make there be pending headers and data by setting store height > last submitted
Expand Down
55 changes: 30 additions & 25 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ type Syncer struct {
options common.BlockOptions

// State management
lastState types.State
lastStateMtx *sync.RWMutex
lastState *atomic.Pointer[types.State]

// DA state
daHeight uint64
daHeight *atomic.Uint64

// P2P stores
headerStore goheader.Store[*types.SignedHeader]
Expand Down Expand Up @@ -90,20 +89,21 @@ func NewSyncer(
errorCh chan<- error,
) *Syncer {
return &Syncer{
store: store,
exec: exec,
da: da,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
options: options,
headerStore: headerStore,
dataStore: dataStore,
lastStateMtx: &sync.RWMutex{},
heightInCh: make(chan common.DAHeightEvent, 10_000),
errorCh: errorCh,
logger: logger.With().Str("component", "syncer").Logger(),
store: store,
exec: exec,
da: da,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
options: options,
headerStore: headerStore,
dataStore: dataStore,
lastState: &atomic.Pointer[types.State]{},
daHeight: &atomic.Uint64{},
heightInCh: make(chan common.DAHeightEvent, 10_000),
errorCh: errorCh,
logger: logger.With().Str("component", "syncer").Logger(),
}
}

Expand Down Expand Up @@ -150,26 +150,31 @@ func (s *Syncer) Stop() error {

// GetLastState returns the current state
func (s *Syncer) GetLastState() types.State {
s.lastStateMtx.RLock()
defer s.lastStateMtx.RUnlock()
return s.lastState
state := s.lastState.Load()
if state == nil {
return types.State{}
}

stateCopy := *state
stateCopy.AppHash = bytes.Clone(state.AppHash)
stateCopy.LastResultsHash = bytes.Clone(state.LastResultsHash)

return stateCopy
}

// SetLastState updates the current state
func (s *Syncer) SetLastState(state types.State) {
s.lastStateMtx.Lock()
defer s.lastStateMtx.Unlock()
s.lastState = state
s.lastState.Store(&state)
}

// GetDAHeight returns the current DA height
func (s *Syncer) GetDAHeight() uint64 {
return atomic.LoadUint64(&s.daHeight)
return s.daHeight.Load()
}

// SetDAHeight updates the DA height
func (s *Syncer) SetDAHeight(height uint64) {
atomic.StoreUint64(&s.daHeight, height)
s.daHeight.Store(height)
}

// initializeState loads the current sync state
Expand Down
2 changes: 1 addition & 1 deletion scripts/test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ test-integration-cover:
## test-cover: generate code coverage report.
test-cover:
@echo "--> Running unit tests"
@go run -tags=cover scripts/test_cover.go
@go run -tags=cover -race scripts/test_cover.go
.PHONY: test-cover

## bench: run micro-benchmarks for internal cache
Expand Down
Loading