Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4a3db54
fixes exec queue add and pop
AdoAdoAdo Jan 5, 2026
60655ce
fixes
AdoAdoAdo Jan 8, 2026
207937d
clean channel only when queue becomes empty
AdoAdoAdo Jan 8, 2026
82892b5
Merge branch 'feat/supernova-async-exec' into fix-race-execution
sstanculeanu Jan 8, 2026
09ba7f3
Merge branch 'feat/supernova-async-exec' into fix-race-execution
AdoAdoAdo Jan 8, 2026
d1b5c2c
fix trigger epoch start on boot
ssd04 Jan 8, 2026
4e40327
Merge branch 'feat/supernova-async-exec' into fix-race-execution
AdoAdoAdo Jan 9, 2026
aa30a9b
do not early return if one expected epoch not found
ssd04 Jan 9, 2026
c4a2a02
fix consensus revert
sstanculeanu Jan 12, 2026
dafbec8
Merge branch 'feat/supernova-async-exec' into fix-race-execution
AdoAdoAdo Jan 12, 2026
839b187
Merge pull request #7596 from multiversx/fix-trigger-epoch-start-on-boot
AdoAdoAdo Jan 12, 2026
7cfed02
Merge pull request #7600 from multiversx/fix-consensus-revert
AdoAdoAdo Jan 12, 2026
9c8c1ef
fix consensus state nil header
sstanculeanu Jan 12, 2026
45b9329
fix pending mini blocks computation for epoch start data on v3 metablock
AdoAdoAdo Jan 12, 2026
47f3ab5
Merge pull request #7602 from multiversx/fix-nil-cns-state-header
sstanculeanu Jan 12, 2026
bdab59b
Merge remote-tracking branch 'origin/fix-race-execution' into epoch-c…
AdoAdoAdo Jan 12, 2026
944b95f
fix unit test
AdoAdoAdo Jan 12, 2026
cf0042e
fix unit test
AdoAdoAdo Jan 13, 2026
06fe612
Merge branch 'feat/supernova-async-exec' into fix-race-execution
AdoAdoAdo Jan 13, 2026
7112d80
Merge remote-tracking branch 'origin/fix-race-execution' into epoch-c…
AdoAdoAdo Jan 13, 2026
0b6f2f8
Merge pull request #7603 from multiversx/epoch-change-fixes
AdoAdoAdo Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions consensus/spos/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,11 +864,17 @@ func (wrk *Worker) Extend(subroundId int) {
time.Sleep(time.Millisecond)
}

log.Debug("current block is reverted")

header := wrk.consensusState.GetHeader()
isHeaderV3 := header.IsHeaderV3()
if isHeaderV3 {
return
}

wrk.scheduledProcessor.ForceStopScheduledExecutionBlocking()
wrk.blockProcessor.RevertCurrentBlock(wrk.consensusState.GetHeader())
wrk.blockProcessor.RevertCurrentBlock(header)
wrk.removeConsensusHeaderFromPool()

log.Debug("current block is reverted")
}

func (wrk *Worker) removeConsensusHeaderFromPool() {
Expand Down
28 changes: 23 additions & 5 deletions epochStart/metachain/epochStartData.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (e *epochStartData) CreateEpochStartData() (*block.EpochStart, error) {
func (e *epochStartData) CreateEpochStartShardDataMetablockV3(metablock data.MetaHeaderHandler) ([]block.EpochStartShardData, error) {
log.Debug("CreateEpochStartShardDataMetablockV3",
"metablock epoch", metablock.GetEpoch(),
"for epoch", metablock.GetEpoch()+1,
"isEpochChangeProposed", metablock.IsEpochChangeProposed(),
"trigger epoch", e.epochStartTrigger.Epoch())

Expand Down Expand Up @@ -390,17 +391,34 @@ func (e *epochStartData) getShardDataFromEpochStartData(
return nil, nil, process.ErrGettingShardDataFromEpochStartData
}

func (e *epochStartData) getPrevEpoch() uint32 {
prevEpoch := e.genesisEpoch

epochStartTriggerEpoch := e.epochStartTrigger.Epoch()

if epochStartTriggerEpoch <= e.genesisEpoch {
return prevEpoch
}

prevEpoch = e.epochStartTrigger.Epoch()

isAfterSupernova := epochStartTriggerEpoch > e.enableEpochsHandler.GetActivationEpoch(common.SupernovaFlag)
if !isAfterSupernova {
prevEpoch--
}

return prevEpoch
}

func (e *epochStartData) computePendingMiniBlockList(
startData *block.EpochStart,
allShardHdrList [][]data.HeaderHandler,
) ([]block.MiniBlockHeader, error) {

prevEpoch := e.genesisEpoch
if e.epochStartTrigger.Epoch() > e.genesisEpoch {
prevEpoch = e.epochStartTrigger.Epoch() - 1
}
prevEpoch := e.getPrevEpoch()

epochStartIdentifier := core.EpochStartIdentifier(prevEpoch)

// TODO: analyse error handling here
previousEpochStartMeta, _ := process.GetMetaHeaderFromStorage([]byte(epochStartIdentifier), e.marshalizer, e.store)

allPending := make([]block.MiniBlockHeader, 0)
Expand Down
10 changes: 5 additions & 5 deletions epochStart/metachain/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (t *trigger) Update(round uint64, nonce uint64) {
}

if t.shouldTriggerEpochStart(round, nonce) {
t.setEpochChange(round)
t.setEpochChange(round, t.epoch+1)
}
}

Expand All @@ -270,11 +270,11 @@ func (t *trigger) SetEpochChange(round uint64) {
t.mutTrigger.Lock()
defer t.mutTrigger.Unlock()

t.setEpochChange(round)
t.setEpochChange(round, t.epoch+1)
}

func (t *trigger) setEpochChange(round uint64) {
t.epoch += 1
func (t *trigger) setEpochChange(round uint64, epoch uint32) {
t.epoch = epoch
t.isEpochStart = true
t.prevEpochStartRound = t.currEpochStartRound
t.currEpochStartRound = round
Expand All @@ -300,7 +300,7 @@ func (t *trigger) SetProcessed(header data.HeaderHandler, body data.BodyHandler)
}

if header.IsHeaderV3() {
t.setEpochChange(header.GetRound())
t.setEpochChange(header.GetRound(), header.GetEpoch())
}

metaBuff, errNotCritical := t.marshaller.Marshal(metaBlock)
Expand Down
3 changes: 3 additions & 0 deletions process/asyncExecution/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ var ErrNilExecutionTracker = errors.New("nil execution tracker")

// ErrNilBlockProcessor signals that a nil block processor has been provided
var ErrNilBlockProcessor = errors.New("nil block processor")

// ErrNilExecutionResult signals that a nil execution result has been provided
var ErrNilExecutionResult = errors.New("nil execution result")
53 changes: 50 additions & 3 deletions process/asyncExecution/headersExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ var log = logger.GetOrCreate("process/asyncExecution")

const timeToSleep = time.Millisecond * 5
const timeToSleepOnError = time.Millisecond * 300
const maxRetryAttempts = 10
const maxBackoffTime = time.Second * 5
const validationInterval = time.Minute * 1

// ArgsHeadersExecutor holds all the components needed to create a new instance of *headersExecutor
type ArgsHeadersExecutor struct {
Expand Down Expand Up @@ -72,6 +75,8 @@ func (he *headersExecutor) StartExecution() {

// PauseExecution pauses the execution
func (he *headersExecutor) PauseExecution() {
log.Debug("headersExecutor.PauseExecution: pausing execution")

he.mutPaused.Lock()
defer he.mutPaused.Unlock()

Expand All @@ -84,6 +89,8 @@ func (he *headersExecutor) PauseExecution() {

// ResumeExecution resumes the execution
func (he *headersExecutor) ResumeExecution() {
log.Debug("headersExecutor.ResumeExecution: resuming execution")

he.mutPaused.Lock()
defer he.mutPaused.Unlock()

Expand All @@ -93,10 +100,19 @@ func (he *headersExecutor) ResumeExecution() {
func (he *headersExecutor) start(ctx context.Context) {
log.Debug("headersExecutor.start: starting execution")

validationTicker := time.NewTicker(validationInterval)
defer validationTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-validationTicker.C:
// Periodic queue validation
err := he.blocksQueue.ValidateQueueIntegrity()
if err != nil {
log.Error("headersExecutor.start: queue integrity validation failed", "err", err)
}
default:
he.mutPaused.RLock()
isPaused := he.isPaused
Expand Down Expand Up @@ -129,43 +145,74 @@ func (he *headersExecutor) start(ctx context.Context) {
}

func (he *headersExecutor) handleProcessError(ctx context.Context, pair queue.HeaderBodyPair) {
for {
retryCount := 0
backoffTime := timeToSleepOnError

for retryCount < maxRetryAttempts {
pairFromQueue, ok := he.blocksQueue.Peek()
if ok && pairFromQueue.Header.GetNonce() == pair.Header.GetNonce() {
// continue the processing (pop the next header from queue)
return
}

select {
case <-ctx.Done():
return
default:
// retry with the same pair
err := he.process(pair)
if err == nil {
log.Debug("headersExecutor.handleProcessError - retry succeeded",
"nonce", pair.Header.GetNonce(),
"retry_count", retryCount)
return
}
time.Sleep(timeToSleepOnError)
retryCount++
log.Warn("headersExecutor.handleProcessError - retry failed",
"nonce", pair.Header.GetNonce(),
"retry_count", retryCount,
"max_retries", maxRetryAttempts,
"err", err)

// Exponential backoff with maximum limit
time.Sleep(backoffTime)
backoffTime = backoffTime * 2
if backoffTime > maxBackoffTime {
backoffTime = maxBackoffTime
}
}
}

log.Error("headersExecutor.handleProcessError - max retries exceeded, skipping block",
"nonce", pair.Header.GetNonce(),
"max_retries", maxRetryAttempts)
}

func (he *headersExecutor) process(pair queue.HeaderBodyPair) error {
executionResult, err := he.blockProcessor.ProcessBlockProposal(pair.Header, pair.Body)
if err != nil {
log.Warn("headersExecutor.process process block failed",
"nonce", pair.Header.GetNonce(),
"prevHash", pair.Header.GetPrevHash(),
"err", err,
)
return err
}

// Validate execution result
if check.IfNil(executionResult) {
log.Warn("headersExecutor.process - nil execution result received",
"nonce", pair.Header.GetNonce())
return ErrNilExecutionResult
}

err = he.executionTracker.AddExecutionResult(executionResult)
if err != nil {
log.Warn("headersExecutor.process add execution result failed",
"nonce", pair.Header.GetNonce(),
"err", err,
)
return nil
return err
}

he.blockChain.SetFinalBlockInfo(
Expand Down
9 changes: 7 additions & 2 deletions process/asyncExecution/headersExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ func TestHeadersExecutor_ProcessBlock(t *testing.T) {
blocksQueue := queue.NewBlocksQueue()
args.BlocksQueue = blocksQueue
wasAddExecutionResultCalled := atomicCore.Flag{}
args.BlockProcessor = &processMocks.BlockProcessorStub{
ProcessBlockProposalCalled: func(handler data.HeaderHandler, body data.BodyHandler) (data.BaseExecutionResultHandler, error) {
return &block.BaseExecutionResult{}, nil
},
}
args.ExecutionTracker = &processMocks.ExecutionTrackerStub{
AddExecutionResultCalled: func(executionResult data.BaseExecutionResultHandler) error {
wasAddExecutionResultCalled.SetValue(true)
Expand Down Expand Up @@ -393,7 +398,7 @@ func TestHeadersExecutor_Process(t *testing.T) {
require.Equal(t, expectedErr, err)
})

t.Run("should return nil on failing to add execution results to execution tracker", func(t *testing.T) {
t.Run("should return error on failing to add execution results to execution tracker", func(t *testing.T) {
t.Parallel()

args := createMockArgs()
Expand Down Expand Up @@ -422,7 +427,7 @@ func TestHeadersExecutor_Process(t *testing.T) {
}

err := executor.Process(pair)
require.Nil(t, err)
require.Equal(t, expectedErr, err)
})

t.Run("should add execution result info to blockchain handler", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions process/asyncExecution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type BlocksQueue interface {
Pop() (queue.HeaderBodyPair, bool)
Peek() (queue.HeaderBodyPair, bool)
ValidateQueueIntegrity() error
IsInterfaceNil() bool
Close()
}
Expand Down
Loading
Loading