diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 2d93a7c46..2746ddc0c 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/ipfs/go-datastore" @@ -50,8 +51,7 @@ type Executor struct { options common.BlockOptions // State management - lastState types.State - lastStateMtx *sync.RWMutex + lastState *atomic.Pointer[types.State] // Channels for coordination txNotifyCh chan struct{} @@ -112,7 +112,7 @@ func NewExecutor( headerBroadcaster: headerBroadcaster, dataBroadcaster: dataBroadcaster, options: options, - lastStateMtx: &sync.RWMutex{}, + lastState: &atomic.Pointer[types.State]{}, txNotifyCh: make(chan struct{}, 1), errorCh: errorCh, logger: logger.With().Str("component", "executor").Logger(), @@ -150,18 +150,29 @@ func (e *Executor) Stop() error { return nil } -// GetLastState returns the current state +// GetLastState returns the current state. func (e *Executor) GetLastState() types.State { - e.lastStateMtx.RLock() - defer e.lastStateMtx.RUnlock() - return e.lastState + state := e.getLastState() + state.AppHash = bytes.Clone(state.AppHash) + state.LastResultsHash = bytes.Clone(state.LastResultsHash) + + return state +} + +// getLastState returns the current state. +// getLastState should never directly mutate. +func (e *Executor) getLastState() types.State { + state := e.lastState.Load() + if state == nil { + return types.State{} + } + + return *state } -// SetLastState updates the current state -func (e *Executor) SetLastState(state types.State) { - e.lastStateMtx.Lock() - defer e.lastStateMtx.Unlock() - e.lastState = state +// setLastState updates the current state +func (e *Executor) setLastState(state types.State) { + e.lastState.Store(&state) } // NotifyNewTransactions signals that new transactions are available @@ -198,7 +209,7 @@ func (e *Executor) initializeState() error { } } - e.SetLastState(state) + e.setLastState(state) // Set store height if err := e.store.SetHeight(e.ctx, state.LastBlockHeight); err != nil { @@ -218,7 +229,7 @@ func (e *Executor) executionLoop() { var delay time.Duration initialHeight := e.genesis.InitialHeight - currentState := e.GetLastState() + currentState := e.getLastState() if currentState.LastBlockHeight < initialHeight { delay = time.Until(e.genesis.StartTime.Add(e.config.Node.BlockTime.Duration)) @@ -291,7 +302,7 @@ func (e *Executor) produceBlock() error { } }() - currentState := e.GetLastState() + currentState := e.getLastState() newHeight := currentState.LastBlockHeight + 1 e.logger.Debug().Uint64("height", newHeight).Msg("producing block") @@ -429,7 +440,7 @@ func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) { // createBlock creates a new block from the given batch func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) { - currentState := e.GetLastState() + currentState := e.getLastState() headerTime := uint64(e.genesis.StartTime.UnixNano()) // Get last block info @@ -518,7 +529,7 @@ func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *Ba // applyBlock applies the block to get the new state func (e *Executor) applyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) { - currentState := e.GetLastState() + currentState := e.getLastState() // Prepare transactions rawTxs := make([][]byte, len(data.Txs)) @@ -601,7 +612,7 @@ func (e *Executor) updateState(ctx context.Context, newState types.State) error return err } - e.SetLastState(newState) + e.setLastState(newState) e.metrics.Height.Set(float64(newState.LastBlockHeight)) return nil diff --git a/block/internal/executing/executor_restart_test.go b/block/internal/executing/executor_restart_test.go index 4725aeb1b..22182873e 100644 --- a/block/internal/executing/executor_restart_test.go +++ b/block/internal/executing/executor_restart_test.go @@ -105,7 +105,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { // Now simulate creating a pending block at height 2 but not fully completing it // This simulates a crash scenario where block data is saved but state isn't updated - currentState := exec1.GetLastState() + currentState := exec1.getLastState() newHeight := currentState.LastBlockHeight + 1 // height 2 // Get validator hash properly @@ -189,7 +189,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { defer exec2.cancel() // Verify that the state is at height 1 (pending block at height 2 wasn't committed) - currentState2 := exec2.GetLastState() + currentState2 := exec2.getLastState() assert.Equal(t, uint64(1), currentState2.LastBlockHeight) // When second executor tries to produce block at height 2, it should use pending data @@ -225,7 +225,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) { assert.True(t, db2.called, "data broadcaster should be called") // Verify the executor state was updated correctly - finalState := exec2.GetLastState() + finalState := exec2.getLastState() assert.Equal(t, uint64(2), finalState.LastBlockHeight) assert.Equal(t, []byte("new_root_2"), finalState.AppHash) @@ -333,7 +333,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) { defer exec2.cancel() // Verify state loaded correctly - state := exec2.GetLastState() + state := exec2.getLastState() assert.Equal(t, uint64(1), state.LastBlockHeight) // Now produce next block - should go through normal sequencer flow since no pending block diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 4606047b1..76ae5b69c 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "fmt" "sync" + "sync/atomic" "time" "github.com/rs/zerolog" @@ -45,8 +46,7 @@ type Submitter struct { signer signer.Signer // DA state - daIncludedHeight uint64 - daStateMtx *sync.RWMutex + daIncludedHeight *atomic.Uint64 // Submission state to prevent concurrent submissions headerSubmissionMtx sync.Mutex @@ -78,17 +78,17 @@ func NewSubmitter( errorCh chan<- error, ) *Submitter { return &Submitter{ - store: store, - exec: exec, - cache: cache, - metrics: metrics, - config: config, - genesis: genesis, - daSubmitter: daSubmitter, - signer: signer, - daStateMtx: &sync.RWMutex{}, - errorCh: errorCh, - logger: logger.With().Str("component", "submitter").Logger(), + store: store, + exec: exec, + cache: cache, + metrics: metrics, + config: config, + genesis: genesis, + daSubmitter: daSubmitter, + signer: signer, + daIncludedHeight: &atomic.Uint64{}, + errorCh: errorCh, + logger: logger.With().Str("component", "submitter").Logger(), } } @@ -269,16 +269,12 @@ func (s *Submitter) setFinalWithRetry(nextHeight uint64) error { // GetDAIncludedHeight returns the DA included height func (s *Submitter) GetDAIncludedHeight() uint64 { - s.daStateMtx.RLock() - defer s.daStateMtx.RUnlock() - return s.daIncludedHeight + return s.daIncludedHeight.Load() } // SetDAIncludedHeight updates the DA included height func (s *Submitter) SetDAIncludedHeight(height uint64) { - s.daStateMtx.Lock() - defer s.daStateMtx.Unlock() - s.daIncludedHeight = height + s.daIncludedHeight.Store(height) } // initializeDAIncludedHeight loads the DA included height from store diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index c0dd4281b..9e36f281e 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -5,7 +5,7 @@ import ( "encoding/binary" "errors" "fmt" - "sync" + "sync/atomic" "testing" "time" @@ -210,7 +210,7 @@ func TestSubmitter_initializeDAIncludedHeight(t *testing.T) { binary.LittleEndian.PutUint64(bz, 7) require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz)) - s := &Submitter{store: st, daStateMtx: &sync.RWMutex{}, logger: zerolog.Nop()} + s := &Submitter{store: st, daIncludedHeight: &atomic.Uint64{}, logger: zerolog.Nop()} require.NoError(t, s.initializeDAIncludedHeight(ctx)) assert.Equal(t, uint64(7), s.GetDAIncludedHeight()) } @@ -324,16 +324,16 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) { // Provide a minimal signer implementation s := &Submitter{ - store: st, - exec: exec, - cache: cm, - metrics: metrics, - config: cfg, - genesis: genesis.Genesis{}, - daSubmitter: fakeDA, - signer: &fakeSigner{}, - daStateMtx: &sync.RWMutex{}, - logger: zerolog.Nop(), + store: st, + exec: exec, + cache: cm, + metrics: metrics, + config: cfg, + genesis: genesis.Genesis{}, + daSubmitter: fakeDA, + signer: &fakeSigner{}, + daIncludedHeight: &atomic.Uint64{}, + logger: zerolog.Nop(), } // Make there be pending headers and data by setting store height > last submitted diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 440a08a9c..4e64188b0 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -47,11 +47,10 @@ type Syncer struct { options common.BlockOptions // State management - lastState types.State - lastStateMtx *sync.RWMutex + lastState *atomic.Pointer[types.State] // DA state - daHeight uint64 + daHeight *atomic.Uint64 // P2P stores headerStore goheader.Store[*types.SignedHeader] @@ -90,20 +89,21 @@ func NewSyncer( errorCh chan<- error, ) *Syncer { return &Syncer{ - store: store, - exec: exec, - da: da, - cache: cache, - metrics: metrics, - config: config, - genesis: genesis, - options: options, - headerStore: headerStore, - dataStore: dataStore, - lastStateMtx: &sync.RWMutex{}, - heightInCh: make(chan common.DAHeightEvent, 10_000), - errorCh: errorCh, - logger: logger.With().Str("component", "syncer").Logger(), + store: store, + exec: exec, + da: da, + cache: cache, + metrics: metrics, + config: config, + genesis: genesis, + options: options, + headerStore: headerStore, + dataStore: dataStore, + lastState: &atomic.Pointer[types.State]{}, + daHeight: &atomic.Uint64{}, + heightInCh: make(chan common.DAHeightEvent, 10_000), + errorCh: errorCh, + logger: logger.With().Str("component", "syncer").Logger(), } } @@ -150,26 +150,31 @@ func (s *Syncer) Stop() error { // GetLastState returns the current state func (s *Syncer) GetLastState() types.State { - s.lastStateMtx.RLock() - defer s.lastStateMtx.RUnlock() - return s.lastState + state := s.lastState.Load() + if state == nil { + return types.State{} + } + + stateCopy := *state + stateCopy.AppHash = bytes.Clone(state.AppHash) + stateCopy.LastResultsHash = bytes.Clone(state.LastResultsHash) + + return stateCopy } // SetLastState updates the current state func (s *Syncer) SetLastState(state types.State) { - s.lastStateMtx.Lock() - defer s.lastStateMtx.Unlock() - s.lastState = state + s.lastState.Store(&state) } // GetDAHeight returns the current DA height func (s *Syncer) GetDAHeight() uint64 { - return atomic.LoadUint64(&s.daHeight) + return s.daHeight.Load() } // SetDAHeight updates the DA height func (s *Syncer) SetDAHeight(height uint64) { - atomic.StoreUint64(&s.daHeight, height) + s.daHeight.Store(height) } // initializeState loads the current sync state diff --git a/scripts/test.mk b/scripts/test.mk index a4daf3608..882deada2 100644 --- a/scripts/test.mk +++ b/scripts/test.mk @@ -36,7 +36,7 @@ test-integration-cover: ## test-cover: generate code coverage report. test-cover: @echo "--> Running unit tests" - @go run -tags=cover scripts/test_cover.go + @go run -tags=cover -race scripts/test_cover.go .PHONY: test-cover ## bench: run micro-benchmarks for internal cache