Skip to content

Commit 528081b

Browse files
authored
perf(block): use sync/atomic instead of mutexes (#2735)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Use sync/atomic` instead of mutexes. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent b646e66 commit 528081b

File tree

6 files changed

+91
-79
lines changed

6 files changed

+91
-79
lines changed

block/internal/executing/executor.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/ipfs/go-datastore"
@@ -50,8 +51,7 @@ type Executor struct {
5051
options common.BlockOptions
5152

5253
// State management
53-
lastState types.State
54-
lastStateMtx *sync.RWMutex
54+
lastState *atomic.Pointer[types.State]
5555

5656
// Channels for coordination
5757
txNotifyCh chan struct{}
@@ -112,7 +112,7 @@ func NewExecutor(
112112
headerBroadcaster: headerBroadcaster,
113113
dataBroadcaster: dataBroadcaster,
114114
options: options,
115-
lastStateMtx: &sync.RWMutex{},
115+
lastState: &atomic.Pointer[types.State]{},
116116
txNotifyCh: make(chan struct{}, 1),
117117
errorCh: errorCh,
118118
logger: logger.With().Str("component", "executor").Logger(),
@@ -150,18 +150,29 @@ func (e *Executor) Stop() error {
150150
return nil
151151
}
152152

153-
// GetLastState returns the current state
153+
// GetLastState returns the current state.
154154
func (e *Executor) GetLastState() types.State {
155-
e.lastStateMtx.RLock()
156-
defer e.lastStateMtx.RUnlock()
157-
return e.lastState
155+
state := e.getLastState()
156+
state.AppHash = bytes.Clone(state.AppHash)
157+
state.LastResultsHash = bytes.Clone(state.LastResultsHash)
158+
159+
return state
160+
}
161+
162+
// getLastState returns the current state.
163+
// getLastState should never directly mutate.
164+
func (e *Executor) getLastState() types.State {
165+
state := e.lastState.Load()
166+
if state == nil {
167+
return types.State{}
168+
}
169+
170+
return *state
158171
}
159172

160-
// SetLastState updates the current state
161-
func (e *Executor) SetLastState(state types.State) {
162-
e.lastStateMtx.Lock()
163-
defer e.lastStateMtx.Unlock()
164-
e.lastState = state
173+
// setLastState updates the current state
174+
func (e *Executor) setLastState(state types.State) {
175+
e.lastState.Store(&state)
165176
}
166177

167178
// NotifyNewTransactions signals that new transactions are available
@@ -198,7 +209,7 @@ func (e *Executor) initializeState() error {
198209
}
199210
}
200211

201-
e.SetLastState(state)
212+
e.setLastState(state)
202213

203214
// Set store height
204215
if err := e.store.SetHeight(e.ctx, state.LastBlockHeight); err != nil {
@@ -218,7 +229,7 @@ func (e *Executor) executionLoop() {
218229

219230
var delay time.Duration
220231
initialHeight := e.genesis.InitialHeight
221-
currentState := e.GetLastState()
232+
currentState := e.getLastState()
222233

223234
if currentState.LastBlockHeight < initialHeight {
224235
delay = time.Until(e.genesis.StartTime.Add(e.config.Node.BlockTime.Duration))
@@ -291,7 +302,7 @@ func (e *Executor) produceBlock() error {
291302
}
292303
}()
293304

294-
currentState := e.GetLastState()
305+
currentState := e.getLastState()
295306
newHeight := currentState.LastBlockHeight + 1
296307

297308
e.logger.Debug().Uint64("height", newHeight).Msg("producing block")
@@ -429,7 +440,7 @@ func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) {
429440

430441
// createBlock creates a new block from the given batch
431442
func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
432-
currentState := e.GetLastState()
443+
currentState := e.getLastState()
433444
headerTime := uint64(e.genesis.StartTime.UnixNano())
434445

435446
// Get last block info
@@ -518,7 +529,7 @@ func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *Ba
518529

519530
// applyBlock applies the block to get the new state
520531
func (e *Executor) applyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) {
521-
currentState := e.GetLastState()
532+
currentState := e.getLastState()
522533

523534
// Prepare transactions
524535
rawTxs := make([][]byte, len(data.Txs))
@@ -601,7 +612,7 @@ func (e *Executor) updateState(ctx context.Context, newState types.State) error
601612
return err
602613
}
603614

604-
e.SetLastState(newState)
615+
e.setLastState(newState)
605616
e.metrics.Height.Set(float64(newState.LastBlockHeight))
606617

607618
return nil

block/internal/executing/executor_restart_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
105105

106106
// Now simulate creating a pending block at height 2 but not fully completing it
107107
// This simulates a crash scenario where block data is saved but state isn't updated
108-
currentState := exec1.GetLastState()
108+
currentState := exec1.getLastState()
109109
newHeight := currentState.LastBlockHeight + 1 // height 2
110110

111111
// Get validator hash properly
@@ -189,7 +189,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
189189
defer exec2.cancel()
190190

191191
// Verify that the state is at height 1 (pending block at height 2 wasn't committed)
192-
currentState2 := exec2.GetLastState()
192+
currentState2 := exec2.getLastState()
193193
assert.Equal(t, uint64(1), currentState2.LastBlockHeight)
194194

195195
// When second executor tries to produce block at height 2, it should use pending data
@@ -225,7 +225,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
225225
assert.True(t, db2.called, "data broadcaster should be called")
226226

227227
// Verify the executor state was updated correctly
228-
finalState := exec2.GetLastState()
228+
finalState := exec2.getLastState()
229229
assert.Equal(t, uint64(2), finalState.LastBlockHeight)
230230
assert.Equal(t, []byte("new_root_2"), finalState.AppHash)
231231

@@ -333,7 +333,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
333333
defer exec2.cancel()
334334

335335
// Verify state loaded correctly
336-
state := exec2.GetLastState()
336+
state := exec2.getLastState()
337337
assert.Equal(t, uint64(1), state.LastBlockHeight)
338338

339339
// Now produce next block - should go through normal sequencer flow since no pending block

block/internal/submitting/submitter.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/binary"
77
"fmt"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/rs/zerolog"
@@ -45,8 +46,7 @@ type Submitter struct {
4546
signer signer.Signer
4647

4748
// DA state
48-
daIncludedHeight uint64
49-
daStateMtx *sync.RWMutex
49+
daIncludedHeight *atomic.Uint64
5050

5151
// Submission state to prevent concurrent submissions
5252
headerSubmissionMtx sync.Mutex
@@ -78,17 +78,17 @@ func NewSubmitter(
7878
errorCh chan<- error,
7979
) *Submitter {
8080
return &Submitter{
81-
store: store,
82-
exec: exec,
83-
cache: cache,
84-
metrics: metrics,
85-
config: config,
86-
genesis: genesis,
87-
daSubmitter: daSubmitter,
88-
signer: signer,
89-
daStateMtx: &sync.RWMutex{},
90-
errorCh: errorCh,
91-
logger: logger.With().Str("component", "submitter").Logger(),
81+
store: store,
82+
exec: exec,
83+
cache: cache,
84+
metrics: metrics,
85+
config: config,
86+
genesis: genesis,
87+
daSubmitter: daSubmitter,
88+
signer: signer,
89+
daIncludedHeight: &atomic.Uint64{},
90+
errorCh: errorCh,
91+
logger: logger.With().Str("component", "submitter").Logger(),
9292
}
9393
}
9494

@@ -269,16 +269,12 @@ func (s *Submitter) setFinalWithRetry(nextHeight uint64) error {
269269

270270
// GetDAIncludedHeight returns the DA included height
271271
func (s *Submitter) GetDAIncludedHeight() uint64 {
272-
s.daStateMtx.RLock()
273-
defer s.daStateMtx.RUnlock()
274-
return s.daIncludedHeight
272+
return s.daIncludedHeight.Load()
275273
}
276274

277275
// SetDAIncludedHeight updates the DA included height
278276
func (s *Submitter) SetDAIncludedHeight(height uint64) {
279-
s.daStateMtx.Lock()
280-
defer s.daStateMtx.Unlock()
281-
s.daIncludedHeight = height
277+
s.daIncludedHeight.Store(height)
282278
}
283279

284280
// initializeDAIncludedHeight loads the DA included height from store

block/internal/submitting/submitter_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"encoding/binary"
66
"errors"
77
"fmt"
8-
"sync"
8+
"sync/atomic"
99
"testing"
1010
"time"
1111

@@ -210,7 +210,7 @@ func TestSubmitter_initializeDAIncludedHeight(t *testing.T) {
210210
binary.LittleEndian.PutUint64(bz, 7)
211211
require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz))
212212

213-
s := &Submitter{store: st, daStateMtx: &sync.RWMutex{}, logger: zerolog.Nop()}
213+
s := &Submitter{store: st, daIncludedHeight: &atomic.Uint64{}, logger: zerolog.Nop()}
214214
require.NoError(t, s.initializeDAIncludedHeight(ctx))
215215
assert.Equal(t, uint64(7), s.GetDAIncludedHeight())
216216
}
@@ -324,16 +324,16 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) {
324324

325325
// Provide a minimal signer implementation
326326
s := &Submitter{
327-
store: st,
328-
exec: exec,
329-
cache: cm,
330-
metrics: metrics,
331-
config: cfg,
332-
genesis: genesis.Genesis{},
333-
daSubmitter: fakeDA,
334-
signer: &fakeSigner{},
335-
daStateMtx: &sync.RWMutex{},
336-
logger: zerolog.Nop(),
327+
store: st,
328+
exec: exec,
329+
cache: cm,
330+
metrics: metrics,
331+
config: cfg,
332+
genesis: genesis.Genesis{},
333+
daSubmitter: fakeDA,
334+
signer: &fakeSigner{},
335+
daIncludedHeight: &atomic.Uint64{},
336+
logger: zerolog.Nop(),
337337
}
338338

339339
// Make there be pending headers and data by setting store height > last submitted

block/internal/syncing/syncer.go

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,10 @@ type Syncer struct {
4747
options common.BlockOptions
4848

4949
// State management
50-
lastState types.State
51-
lastStateMtx *sync.RWMutex
50+
lastState *atomic.Pointer[types.State]
5251

5352
// DA state
54-
daHeight uint64
53+
daHeight *atomic.Uint64
5554

5655
// P2P stores
5756
headerStore goheader.Store[*types.SignedHeader]
@@ -90,20 +89,21 @@ func NewSyncer(
9089
errorCh chan<- error,
9190
) *Syncer {
9291
return &Syncer{
93-
store: store,
94-
exec: exec,
95-
da: da,
96-
cache: cache,
97-
metrics: metrics,
98-
config: config,
99-
genesis: genesis,
100-
options: options,
101-
headerStore: headerStore,
102-
dataStore: dataStore,
103-
lastStateMtx: &sync.RWMutex{},
104-
heightInCh: make(chan common.DAHeightEvent, 10_000),
105-
errorCh: errorCh,
106-
logger: logger.With().Str("component", "syncer").Logger(),
92+
store: store,
93+
exec: exec,
94+
da: da,
95+
cache: cache,
96+
metrics: metrics,
97+
config: config,
98+
genesis: genesis,
99+
options: options,
100+
headerStore: headerStore,
101+
dataStore: dataStore,
102+
lastState: &atomic.Pointer[types.State]{},
103+
daHeight: &atomic.Uint64{},
104+
heightInCh: make(chan common.DAHeightEvent, 10_000),
105+
errorCh: errorCh,
106+
logger: logger.With().Str("component", "syncer").Logger(),
107107
}
108108
}
109109

@@ -150,26 +150,31 @@ func (s *Syncer) Stop() error {
150150

151151
// GetLastState returns the current state
152152
func (s *Syncer) GetLastState() types.State {
153-
s.lastStateMtx.RLock()
154-
defer s.lastStateMtx.RUnlock()
155-
return s.lastState
153+
state := s.lastState.Load()
154+
if state == nil {
155+
return types.State{}
156+
}
157+
158+
stateCopy := *state
159+
stateCopy.AppHash = bytes.Clone(state.AppHash)
160+
stateCopy.LastResultsHash = bytes.Clone(state.LastResultsHash)
161+
162+
return stateCopy
156163
}
157164

158165
// SetLastState updates the current state
159166
func (s *Syncer) SetLastState(state types.State) {
160-
s.lastStateMtx.Lock()
161-
defer s.lastStateMtx.Unlock()
162-
s.lastState = state
167+
s.lastState.Store(&state)
163168
}
164169

165170
// GetDAHeight returns the current DA height
166171
func (s *Syncer) GetDAHeight() uint64 {
167-
return atomic.LoadUint64(&s.daHeight)
172+
return s.daHeight.Load()
168173
}
169174

170175
// SetDAHeight updates the DA height
171176
func (s *Syncer) SetDAHeight(height uint64) {
172-
atomic.StoreUint64(&s.daHeight, height)
177+
s.daHeight.Store(height)
173178
}
174179

175180
// initializeState loads the current sync state

scripts/test.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ test-integration-cover:
3636
## test-cover: generate code coverage report.
3737
test-cover:
3838
@echo "--> Running unit tests"
39-
@go run -tags=cover scripts/test_cover.go
39+
@go run -tags=cover -race scripts/test_cover.go
4040
.PHONY: test-cover
4141

4242
## bench: run micro-benchmarks for internal cache

0 commit comments

Comments
 (0)