Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -50,8 +51,7 @@ type Executor struct {
options common.BlockOptions

// State management
lastState types.State
lastStateMtx *sync.RWMutex
lastState *atomic.Pointer[types.State]

// Channels for coordination
txNotifyCh chan struct{}
Expand Down Expand Up @@ -112,7 +112,7 @@ func NewExecutor(
headerBroadcaster: headerBroadcaster,
dataBroadcaster: dataBroadcaster,
options: options,
lastStateMtx: &sync.RWMutex{},
lastState: &atomic.Pointer[types.State]{},
txNotifyCh: make(chan struct{}, 1),
errorCh: errorCh,
logger: logger.With().Str("component", "executor").Logger(),
Expand Down Expand Up @@ -152,16 +152,17 @@ func (e *Executor) Stop() error {

// GetLastState returns the current state
func (e *Executor) GetLastState() types.State {
e.lastStateMtx.RLock()
defer e.lastStateMtx.RUnlock()
return e.lastState
state := e.lastState.Load()
if state == nil {
// Return zero value if not initialized
return types.State{}
}
return *state
}

// SetLastState updates the current state
func (e *Executor) SetLastState(state types.State) {
e.lastStateMtx.Lock()
defer e.lastStateMtx.Unlock()
e.lastState = state
e.lastState.Store(&state)
}

// NotifyNewTransactions signals that new transactions are available
Expand Down
34 changes: 15 additions & 19 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -45,8 +46,7 @@ type Submitter struct {
signer signer.Signer

// DA state
daIncludedHeight uint64
daStateMtx *sync.RWMutex
daIncludedHeight *atomic.Uint64

// Submission state to prevent concurrent submissions
headerSubmissionMtx sync.Mutex
Expand Down Expand Up @@ -78,17 +78,17 @@ func NewSubmitter(
errorCh chan<- error,
) *Submitter {
return &Submitter{
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
daSubmitter: daSubmitter,
signer: signer,
daStateMtx: &sync.RWMutex{},
errorCh: errorCh,
logger: logger.With().Str("component", "submitter").Logger(),
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
daSubmitter: daSubmitter,
signer: signer,
daIncludedHeight: &atomic.Uint64{},
errorCh: errorCh,
logger: logger.With().Str("component", "submitter").Logger(),
}
}

Expand Down Expand Up @@ -269,16 +269,12 @@ func (s *Submitter) setFinalWithRetry(nextHeight uint64) error {

// GetDAIncludedHeight returns the DA included height
func (s *Submitter) GetDAIncludedHeight() uint64 {
s.daStateMtx.RLock()
defer s.daStateMtx.RUnlock()
return s.daIncludedHeight
return s.daIncludedHeight.Load()
}

// SetDAIncludedHeight updates the DA included height
func (s *Submitter) SetDAIncludedHeight(height uint64) {
s.daStateMtx.Lock()
defer s.daStateMtx.Unlock()
s.daIncludedHeight = height
s.daIncludedHeight.Store(height)
}

// initializeDAIncludedHeight loads the DA included height from store
Expand Down
24 changes: 12 additions & 12 deletions block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -210,7 +210,7 @@ func TestSubmitter_initializeDAIncludedHeight(t *testing.T) {
binary.LittleEndian.PutUint64(bz, 7)
require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz))

s := &Submitter{store: st, daStateMtx: &sync.RWMutex{}, logger: zerolog.Nop()}
s := &Submitter{store: st, daIncludedHeight: &atomic.Uint64{}, logger: zerolog.Nop()}
require.NoError(t, s.initializeDAIncludedHeight(ctx))
assert.Equal(t, uint64(7), s.GetDAIncludedHeight())
}
Expand Down Expand Up @@ -324,16 +324,16 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) {

// Provide a minimal signer implementation
s := &Submitter{
store: st,
exec: exec,
cache: cm,
metrics: metrics,
config: cfg,
genesis: genesis.Genesis{},
daSubmitter: fakeDA,
signer: &fakeSigner{},
daStateMtx: &sync.RWMutex{},
logger: zerolog.Nop(),
store: st,
exec: exec,
cache: cm,
metrics: metrics,
config: cfg,
genesis: genesis.Genesis{},
daSubmitter: fakeDA,
signer: &fakeSigner{},
daIncludedHeight: &atomic.Uint64{},
logger: zerolog.Nop(),
}

// Make there be pending headers and data by setting store height > last submitted
Expand Down
Loading