Skip to content

Commit aebd60e

Browse files
authored
Merge pull request #7591 from multiversx/fix-race-execution
fixes exec queue add and pop
2 parents 09a3fcc + 0b6f2f8 commit aebd60e

File tree

18 files changed

+521
-45
lines changed

18 files changed

+521
-45
lines changed

consensus/spos/worker.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -864,11 +864,17 @@ func (wrk *Worker) Extend(subroundId int) {
864864
time.Sleep(time.Millisecond)
865865
}
866866

867+
log.Debug("current block is reverted")
868+
869+
header := wrk.consensusState.GetHeader()
870+
isHeaderV3 := !check.IfNil(header) && header.IsHeaderV3()
871+
if isHeaderV3 {
872+
return
873+
}
874+
867875
wrk.scheduledProcessor.ForceStopScheduledExecutionBlocking()
868-
wrk.blockProcessor.RevertCurrentBlock(wrk.consensusState.GetHeader())
876+
wrk.blockProcessor.RevertCurrentBlock(header)
869877
wrk.removeConsensusHeaderFromPool()
870-
871-
log.Debug("current block is reverted")
872878
}
873879

874880
func (wrk *Worker) removeConsensusHeaderFromPool() {

epochStart/metachain/economics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,7 @@ func getPrevEpochMetaStartNonceForEconomics(previousEpochStartMeta data.MetaHead
647647
if !previousEpochStartMeta.IsHeaderV3() {
648648
return previousEpochStartMeta.GetNonce(), nil
649649
}
650+
// todo: extract the epoch change proposal execution result here
650651
lastNotarizedResult, err := common.GetLastBaseExecutionResultHandler(previousEpochStartMeta)
651652
if err != nil {
652653
return 0, err

epochStart/metachain/epochStartData.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ func (e *epochStartData) CreateEpochStartData() (*block.EpochStart, error) {
183183
func (e *epochStartData) CreateEpochStartShardDataMetablockV3(metablock data.MetaHeaderHandler) ([]block.EpochStartShardData, error) {
184184
log.Debug("CreateEpochStartShardDataMetablockV3",
185185
"metablock epoch", metablock.GetEpoch(),
186+
"for epoch", metablock.GetEpoch()+1,
186187
"isEpochChangeProposed", metablock.IsEpochChangeProposed(),
187188
"trigger epoch", e.epochStartTrigger.Epoch())
188189

@@ -390,17 +391,34 @@ func (e *epochStartData) getShardDataFromEpochStartData(
390391
return nil, nil, process.ErrGettingShardDataFromEpochStartData
391392
}
392393

394+
func (e *epochStartData) getPrevEpoch() uint32 {
395+
prevEpoch := e.genesisEpoch
396+
397+
epochStartTriggerEpoch := e.epochStartTrigger.Epoch()
398+
399+
if epochStartTriggerEpoch <= e.genesisEpoch {
400+
return prevEpoch
401+
}
402+
403+
prevEpoch = e.epochStartTrigger.Epoch()
404+
405+
isAfterSupernova := epochStartTriggerEpoch > e.enableEpochsHandler.GetActivationEpoch(common.SupernovaFlag)
406+
if !isAfterSupernova {
407+
prevEpoch--
408+
}
409+
410+
return prevEpoch
411+
}
412+
393413
func (e *epochStartData) computePendingMiniBlockList(
394414
startData *block.EpochStart,
395415
allShardHdrList [][]data.HeaderHandler,
396416
) ([]block.MiniBlockHeader, error) {
397-
398-
prevEpoch := e.genesisEpoch
399-
if e.epochStartTrigger.Epoch() > e.genesisEpoch {
400-
prevEpoch = e.epochStartTrigger.Epoch() - 1
401-
}
417+
prevEpoch := e.getPrevEpoch()
402418

403419
epochStartIdentifier := core.EpochStartIdentifier(prevEpoch)
420+
421+
// TODO: analyse error handling here
404422
previousEpochStartMeta, _ := process.GetMetaHeaderFromStorage([]byte(epochStartIdentifier), e.marshalizer, e.store)
405423

406424
allPending := make([]block.MiniBlockHeader, 0)

epochStart/metachain/trigger.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (t *trigger) Update(round uint64, nonce uint64) {
261261
}
262262

263263
if t.shouldTriggerEpochStart(round, nonce) {
264-
t.setEpochChange(round)
264+
t.setEpochChange(round, t.epoch+1)
265265
}
266266
}
267267

@@ -270,11 +270,11 @@ func (t *trigger) SetEpochChange(round uint64) {
270270
t.mutTrigger.Lock()
271271
defer t.mutTrigger.Unlock()
272272

273-
t.setEpochChange(round)
273+
t.setEpochChange(round, t.epoch+1)
274274
}
275275

276-
func (t *trigger) setEpochChange(round uint64) {
277-
t.epoch += 1
276+
func (t *trigger) setEpochChange(round uint64, epoch uint32) {
277+
t.epoch = epoch
278278
t.isEpochStart = true
279279
t.prevEpochStartRound = t.currEpochStartRound
280280
t.currEpochStartRound = round
@@ -300,7 +300,7 @@ func (t *trigger) SetProcessed(header data.HeaderHandler, body data.BodyHandler)
300300
}
301301

302302
if header.IsHeaderV3() {
303-
t.setEpochChange(header.GetRound())
303+
t.setEpochChange(header.GetRound(), header.GetEpoch())
304304
}
305305

306306
metaBuff, errNotCritical := t.marshaller.Marshal(metaBlock)

process/asyncExecution/errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@ var ErrNilExecutionTracker = errors.New("nil execution tracker")
1010

1111
// ErrNilBlockProcessor signals that a nil block processor has been provided
1212
var ErrNilBlockProcessor = errors.New("nil block processor")
13+
14+
// ErrNilExecutionResult signals that a nil execution result has been provided
15+
var ErrNilExecutionResult = errors.New("nil execution result")

process/asyncExecution/headersExecutor.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ var log = logger.GetOrCreate("process/asyncExecution")
1717

1818
const timeToSleep = time.Millisecond * 5
1919
const timeToSleepOnError = time.Millisecond * 300
20+
const maxRetryAttempts = 10
21+
const maxBackoffTime = time.Second * 5
22+
const validationInterval = time.Minute * 1
2023

2124
// ArgsHeadersExecutor holds all the components needed to create a new instance of *headersExecutor
2225
type ArgsHeadersExecutor struct {
@@ -72,6 +75,8 @@ func (he *headersExecutor) StartExecution() {
7275

7376
// PauseExecution pauses the execution
7477
func (he *headersExecutor) PauseExecution() {
78+
log.Debug("headersExecutor.PauseExecution: pausing execution")
79+
7580
he.mutPaused.Lock()
7681
defer he.mutPaused.Unlock()
7782

@@ -84,6 +89,8 @@ func (he *headersExecutor) PauseExecution() {
8489

8590
// ResumeExecution resumes the execution
8691
func (he *headersExecutor) ResumeExecution() {
92+
log.Debug("headersExecutor.ResumeExecution: resuming execution")
93+
8794
he.mutPaused.Lock()
8895
defer he.mutPaused.Unlock()
8996

@@ -93,10 +100,19 @@ func (he *headersExecutor) ResumeExecution() {
93100
func (he *headersExecutor) start(ctx context.Context) {
94101
log.Debug("headersExecutor.start: starting execution")
95102

103+
validationTicker := time.NewTicker(validationInterval)
104+
defer validationTicker.Stop()
105+
96106
for {
97107
select {
98108
case <-ctx.Done():
99109
return
110+
case <-validationTicker.C:
111+
// Periodic queue validation
112+
err := he.blocksQueue.ValidateQueueIntegrity()
113+
if err != nil {
114+
log.Error("headersExecutor.start: queue integrity validation failed", "err", err)
115+
}
100116
default:
101117
he.mutPaused.RLock()
102118
isPaused := he.isPaused
@@ -129,43 +145,74 @@ func (he *headersExecutor) start(ctx context.Context) {
129145
}
130146

131147
func (he *headersExecutor) handleProcessError(ctx context.Context, pair queue.HeaderBodyPair) {
132-
for {
148+
retryCount := 0
149+
backoffTime := timeToSleepOnError
150+
151+
for retryCount < maxRetryAttempts {
133152
pairFromQueue, ok := he.blocksQueue.Peek()
134153
if ok && pairFromQueue.Header.GetNonce() == pair.Header.GetNonce() {
135154
// continue the processing (pop the next header from queue)
136155
return
137156
}
157+
138158
select {
139159
case <-ctx.Done():
140160
return
141161
default:
142162
// retry with the same pair
143163
err := he.process(pair)
144164
if err == nil {
165+
log.Debug("headersExecutor.handleProcessError - retry succeeded",
166+
"nonce", pair.Header.GetNonce(),
167+
"retry_count", retryCount)
145168
return
146169
}
147-
time.Sleep(timeToSleepOnError)
170+
retryCount++
171+
log.Warn("headersExecutor.handleProcessError - retry failed",
172+
"nonce", pair.Header.GetNonce(),
173+
"retry_count", retryCount,
174+
"max_retries", maxRetryAttempts,
175+
"err", err)
176+
177+
// Exponential backoff with maximum limit
178+
time.Sleep(backoffTime)
179+
backoffTime = backoffTime * 2
180+
if backoffTime > maxBackoffTime {
181+
backoffTime = maxBackoffTime
182+
}
148183
}
149184
}
185+
186+
log.Error("headersExecutor.handleProcessError - max retries exceeded, skipping block",
187+
"nonce", pair.Header.GetNonce(),
188+
"max_retries", maxRetryAttempts)
150189
}
151190

152191
func (he *headersExecutor) process(pair queue.HeaderBodyPair) error {
153192
executionResult, err := he.blockProcessor.ProcessBlockProposal(pair.Header, pair.Body)
154193
if err != nil {
155194
log.Warn("headersExecutor.process process block failed",
156195
"nonce", pair.Header.GetNonce(),
196+
"prevHash", pair.Header.GetPrevHash(),
157197
"err", err,
158198
)
159199
return err
160200
}
161201

202+
// Validate execution result
203+
if check.IfNil(executionResult) {
204+
log.Warn("headersExecutor.process - nil execution result received",
205+
"nonce", pair.Header.GetNonce())
206+
return ErrNilExecutionResult
207+
}
208+
162209
err = he.executionTracker.AddExecutionResult(executionResult)
163210
if err != nil {
164211
log.Warn("headersExecutor.process add execution result failed",
165212
"nonce", pair.Header.GetNonce(),
166213
"err", err,
167214
)
168-
return nil
215+
return err
169216
}
170217

171218
he.blockChain.SetFinalBlockInfo(

process/asyncExecution/headersExecutor_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,11 @@ func TestHeadersExecutor_ProcessBlock(t *testing.T) {
228228
blocksQueue := queue.NewBlocksQueue()
229229
args.BlocksQueue = blocksQueue
230230
wasAddExecutionResultCalled := atomicCore.Flag{}
231+
args.BlockProcessor = &processMocks.BlockProcessorStub{
232+
ProcessBlockProposalCalled: func(handler data.HeaderHandler, body data.BodyHandler) (data.BaseExecutionResultHandler, error) {
233+
return &block.BaseExecutionResult{}, nil
234+
},
235+
}
231236
args.ExecutionTracker = &processMocks.ExecutionTrackerStub{
232237
AddExecutionResultCalled: func(executionResult data.BaseExecutionResultHandler) error {
233238
wasAddExecutionResultCalled.SetValue(true)
@@ -393,7 +398,7 @@ func TestHeadersExecutor_Process(t *testing.T) {
393398
require.Equal(t, expectedErr, err)
394399
})
395400

396-
t.Run("should return nil on failing to add execution results to execution tracker", func(t *testing.T) {
401+
t.Run("should return error on failing to add execution results to execution tracker", func(t *testing.T) {
397402
t.Parallel()
398403

399404
args := createMockArgs()
@@ -422,7 +427,7 @@ func TestHeadersExecutor_Process(t *testing.T) {
422427
}
423428

424429
err := executor.Process(pair)
425-
require.Nil(t, err)
430+
require.Equal(t, expectedErr, err)
426431
})
427432

428433
t.Run("should add execution result info to blockchain handler", func(t *testing.T) {

process/asyncExecution/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
type BlocksQueue interface {
1111
Pop() (queue.HeaderBodyPair, bool)
1212
Peek() (queue.HeaderBodyPair, bool)
13+
ValidateQueueIntegrity() error
1314
IsInterfaceNil() bool
1415
Close()
1516
}

0 commit comments

Comments
 (0)