Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4a3db54
fixes exec queue add and pop
AdoAdoAdo Jan 5, 2026
60655ce
fixes
AdoAdoAdo Jan 8, 2026
207937d
clean channel only when queue becomes empty
AdoAdoAdo Jan 8, 2026
82892b5
Merge branch 'feat/supernova-async-exec' into fix-race-execution
sstanculeanu Jan 8, 2026
09ba7f3
Merge branch 'feat/supernova-async-exec' into fix-race-execution
AdoAdoAdo Jan 8, 2026
d1b5c2c
fix trigger epoch start on boot
ssd04 Jan 8, 2026
4e40327
Merge branch 'feat/supernova-async-exec' into fix-race-execution
AdoAdoAdo Jan 9, 2026
aa30a9b
do not early return if one expected epoch not found
ssd04 Jan 9, 2026
c4a2a02
fix consensus revert
sstanculeanu Jan 12, 2026
dafbec8
Merge branch 'feat/supernova-async-exec' into fix-race-execution
AdoAdoAdo Jan 12, 2026
839b187
Merge pull request #7596 from multiversx/fix-trigger-epoch-start-on-boot
AdoAdoAdo Jan 12, 2026
7cfed02
Merge pull request #7600 from multiversx/fix-consensus-revert
AdoAdoAdo Jan 12, 2026
9c8c1ef
fix consensus state nil header
sstanculeanu Jan 12, 2026
45b9329
fix pending mini blocks computation for epoch start data on v3 metablock
AdoAdoAdo Jan 12, 2026
47f3ab5
Merge pull request #7602 from multiversx/fix-nil-cns-state-header
sstanculeanu Jan 12, 2026
bdab59b
Merge remote-tracking branch 'origin/fix-race-execution' into epoch-c…
AdoAdoAdo Jan 12, 2026
944b95f
fix unit test
AdoAdoAdo Jan 12, 2026
cf0042e
fix unit test
AdoAdoAdo Jan 13, 2026
06fe612
Merge branch 'feat/supernova-async-exec' into fix-race-execution
AdoAdoAdo Jan 13, 2026
7112d80
Merge remote-tracking branch 'origin/fix-race-execution' into epoch-c…
AdoAdoAdo Jan 13, 2026
0b6f2f8
Merge pull request #7603 from multiversx/epoch-change-fixes
AdoAdoAdo Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions process/asyncExecution/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ var ErrNilExecutionTracker = errors.New("nil execution tracker")

// ErrNilBlockProcessor signals that a nil block processor has been provided
var ErrNilBlockProcessor = errors.New("nil block processor")

// ErrNilExecutionResult signals that a nil execution result has been provided
var ErrNilExecutionResult = errors.New("nil execution result")
53 changes: 50 additions & 3 deletions process/asyncExecution/headersExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ var log = logger.GetOrCreate("process/asyncExecution")

const timeToSleep = time.Millisecond * 5
const timeToSleepOnError = time.Millisecond * 300
const maxRetryAttempts = 10
const maxBackoffTime = time.Second * 5
const validationInterval = time.Minute * 1

// ArgsHeadersExecutor holds all the components needed to create a new instance of *headersExecutor
type ArgsHeadersExecutor struct {
Expand Down Expand Up @@ -72,6 +75,8 @@ func (he *headersExecutor) StartExecution() {

// PauseExecution pauses the execution
func (he *headersExecutor) PauseExecution() {
log.Debug("headersExecutor.PauseExecution: pausing execution")

he.mutPaused.Lock()
defer he.mutPaused.Unlock()

Expand All @@ -84,6 +89,8 @@ func (he *headersExecutor) PauseExecution() {

// ResumeExecution resumes the execution
func (he *headersExecutor) ResumeExecution() {
log.Debug("headersExecutor.ResumeExecution: resuming execution")

he.mutPaused.Lock()
defer he.mutPaused.Unlock()

Expand All @@ -93,10 +100,19 @@ func (he *headersExecutor) ResumeExecution() {
func (he *headersExecutor) start(ctx context.Context) {
log.Debug("headersExecutor.start: starting execution")

validationTicker := time.NewTicker(validationInterval)
defer validationTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-validationTicker.C:
// Periodic queue validation
err := he.blocksQueue.ValidateQueueIntegrity()
if err != nil {
log.Error("headersExecutor.start: queue integrity validation failed", "err", err)
}
default:
he.mutPaused.RLock()
isPaused := he.isPaused
Expand Down Expand Up @@ -129,43 +145,74 @@ func (he *headersExecutor) start(ctx context.Context) {
}

func (he *headersExecutor) handleProcessError(ctx context.Context, pair queue.HeaderBodyPair) {
for {
retryCount := 0
backoffTime := timeToSleepOnError

for retryCount < maxRetryAttempts {
pairFromQueue, ok := he.blocksQueue.Peek()
if ok && pairFromQueue.Header.GetNonce() == pair.Header.GetNonce() {
// continue the processing (pop the next header from queue)
return
}

select {
case <-ctx.Done():
return
default:
// retry with the same pair
err := he.process(pair)
if err == nil {
log.Debug("headersExecutor.handleProcessError - retry succeeded",
"nonce", pair.Header.GetNonce(),
"retry_count", retryCount)
return
}
time.Sleep(timeToSleepOnError)
retryCount++
log.Warn("headersExecutor.handleProcessError - retry failed",
"nonce", pair.Header.GetNonce(),
"retry_count", retryCount,
"max_retries", maxRetryAttempts,
"err", err)

// Exponential backoff with maximum limit
time.Sleep(backoffTime)
backoffTime = backoffTime * 2
if backoffTime > maxBackoffTime {
backoffTime = maxBackoffTime
}
}
}

log.Error("headersExecutor.handleProcessError - max retries exceeded, skipping block",
"nonce", pair.Header.GetNonce(),
"max_retries", maxRetryAttempts)
}

func (he *headersExecutor) process(pair queue.HeaderBodyPair) error {
executionResult, err := he.blockProcessor.ProcessBlockProposal(pair.Header, pair.Body)
if err != nil {
log.Warn("headersExecutor.process process block failed",
"nonce", pair.Header.GetNonce(),
"prevHash", pair.Header.GetPrevHash(),
"err", err,
)
return err
}

// Validate execution result
if check.IfNil(executionResult) {
log.Warn("headersExecutor.process - nil execution result received",
"nonce", pair.Header.GetNonce())
return ErrNilExecutionResult
}

err = he.executionTracker.AddExecutionResult(executionResult)
if err != nil {
log.Warn("headersExecutor.process add execution result failed",
"nonce", pair.Header.GetNonce(),
"err", err,
)
return nil
return err
}

he.blockChain.SetFinalBlockInfo(
Expand Down
9 changes: 7 additions & 2 deletions process/asyncExecution/headersExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ func TestHeadersExecutor_ProcessBlock(t *testing.T) {
blocksQueue := queue.NewBlocksQueue()
args.BlocksQueue = blocksQueue
wasAddExecutionResultCalled := atomicCore.Flag{}
args.BlockProcessor = &processMocks.BlockProcessorStub{
ProcessBlockProposalCalled: func(handler data.HeaderHandler, body data.BodyHandler) (data.BaseExecutionResultHandler, error) {
return &block.BaseExecutionResult{}, nil
},
}
args.ExecutionTracker = &processMocks.ExecutionTrackerStub{
AddExecutionResultCalled: func(executionResult data.BaseExecutionResultHandler) error {
wasAddExecutionResultCalled.SetValue(true)
Expand Down Expand Up @@ -393,7 +398,7 @@ func TestHeadersExecutor_Process(t *testing.T) {
require.Equal(t, expectedErr, err)
})

t.Run("should return nil on failing to add execution results to execution tracker", func(t *testing.T) {
t.Run("should return error on failing to add execution results to execution tracker", func(t *testing.T) {
t.Parallel()

args := createMockArgs()
Expand Down Expand Up @@ -422,7 +427,7 @@ func TestHeadersExecutor_Process(t *testing.T) {
}

err := executor.Process(pair)
require.Nil(t, err)
require.Equal(t, expectedErr, err)
})

t.Run("should add execution result info to blockchain handler", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions process/asyncExecution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type BlocksQueue interface {
Pop() (queue.HeaderBodyPair, bool)
Peek() (queue.HeaderBodyPair, bool)
ValidateQueueIntegrity() error
IsInterfaceNil() bool
Close()
}
Expand Down
74 changes: 64 additions & 10 deletions process/asyncExecution/queue/blocksQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-go/common"
logger "github.com/multiversx/mx-chain-logger-go"

"github.com/multiversx/mx-chain-go/common"
)

var log = logger.GetOrCreate("process/asyncExecution/queue")
Expand Down Expand Up @@ -70,13 +71,12 @@ func (bq *blocksQueue) AddOrReplace(pair HeaderBodyPair) error {

log.Debug("blocksQueue.AddOrReplace - block has been added", "nonce", pair.Header.GetNonce(), "queue size", len(bq.headerBodyPairs))

if len(bq.headerBodyPairs) > 1 {
return nil
}

select {
case bq.notifyCh <- struct{}{}:
default:
// Notify waiting Pop() calls when the first item is added to an empty queue
if len(bq.headerBodyPairs) == 1 {
select {
case bq.notifyCh <- struct{}{}:
default:
}
}

return nil
Expand Down Expand Up @@ -108,6 +108,7 @@ func (bq *blocksQueue) removeFromNonce(nonce uint64) []uint64 {
return removedNonces
}

log.Debug("blocksQueue.removeFromNonce - removing from nonce", "nonce", nonce, "num removed", len(pairsToBeRemoved))
for _, pair := range pairsToBeRemoved {
removedNonces = append(removedNonces, pair.Header.GetNonce())
}
Expand Down Expand Up @@ -138,10 +139,21 @@ func (bq *blocksQueue) add(pair HeaderBodyPair) error {
// If the queue is empty, the method blocks until a new item is available.
func (bq *blocksQueue) Pop() (HeaderBodyPair, bool) {
bq.mutex.Lock()
if len(bq.headerBodyPairs) > 1 {
if len(bq.headerBodyPairs) >= 1 {
item := bq.headerBodyPairs[0]
bq.headerBodyPairs = bq.headerBodyPairs[1:]
// Clear any stale notification from the channel when we take the fast path
if len(bq.headerBodyPairs) > 0 {
bq.mutex.Unlock()
return item, true
}

select {
case <-bq.notifyCh:
default:
}
bq.mutex.Unlock()

return item, true
}
if bq.closed {
Expand Down Expand Up @@ -190,6 +202,8 @@ func (bq *blocksQueue) Peek() (HeaderBodyPair, bool) {
// RemoveAtNonceAndHigher removes the header-body pair at the specified nonce
// and all pairs with higher nonces from the queue
func (bq *blocksQueue) RemoveAtNonceAndHigher(nonce uint64) []uint64 {
log.Debug("blocksQueue.RemoveAtNonceAndHigher - removing from nonce and higher", "nonce", nonce)

bq.mutex.Lock()
defer bq.mutex.Unlock()

Expand All @@ -202,16 +216,54 @@ func (bq *blocksQueue) RemoveAtNonceAndHigher(nonce uint64) []uint64 {

func (bq *blocksQueue) updateLastAddedNonceBasedOnRemovingNonce(removingNonce uint64) {
if len(bq.headerBodyPairs) > 0 {
bq.lastAddedNonce = bq.headerBodyPairs[len(bq.headerBodyPairs)-1].Header.GetNonce()
lastNonce := bq.headerBodyPairs[len(bq.headerBodyPairs)-1].Header.GetNonce()
bq.lastAddedNonce = lastNonce
log.Debug("blocksQueue.updateLastAddedNonceBasedOnRemovingNonce - updated from queue",
"new_last_nonce", lastNonce,
"queue_size", len(bq.headerBodyPairs))
return
}

if removingNonce > 0 {
bq.lastAddedNonce = removingNonce - 1
log.Debug("blocksQueue.updateLastAddedNonceBasedOnRemovingNonce - calculated from removing nonce",
"removing_nonce", removingNonce,
"new_last_nonce", bq.lastAddedNonce)
return
}

bq.lastAddedNonce = 0
log.Debug("blocksQueue.updateLastAddedNonceBasedOnRemovingNonce - reset to 0")
}

// ValidateQueueIntegrity checks the queue for consistency issues
func (bq *blocksQueue) ValidateQueueIntegrity() error {
bq.mutex.Lock()
defer bq.mutex.Unlock()

if len(bq.headerBodyPairs) == 0 {
return nil
}

// Check that nonces are sequential
expectedNonce := bq.headerBodyPairs[0].Header.GetNonce()
for i, pair := range bq.headerBodyPairs {
currentNonce := pair.Header.GetNonce()
if currentNonce != expectedNonce {
return fmt.Errorf("%w: expected nonce %d at index %d, got %d", ErrQueueIntegrityViolation,
expectedNonce, i, currentNonce)
}
expectedNonce++
}

// Check that lastAddedNonce matches the last item
lastPair := bq.headerBodyPairs[len(bq.headerBodyPairs)-1]
if bq.lastAddedNonce != lastPair.Header.GetNonce() {
return fmt.Errorf("%w: lastAddedNonce %d doesn't match last pair nonce %d", ErrQueueIntegrityViolation,
bq.lastAddedNonce, lastPair.Header.GetNonce())
}

return nil
}

// Clean cleanup the queue and set the provided last added nonce
Expand All @@ -221,6 +273,8 @@ func (bq *blocksQueue) Clean(lastAddedNonce uint64) {

bq.headerBodyPairs = make([]HeaderBodyPair, 0)
bq.lastAddedNonce = lastAddedNonce

log.Debug("blocksQueue.Clean - queue cleaned", "last_added_nonce", lastAddedNonce)
}

// Close will close the queue
Expand Down
Loading
Loading