Skip to content

Commit 7015fff

Browse files
committed
fix flaky pipeline tests
1 parent b822550 commit 7015fff

File tree

3 files changed

+83
-19
lines changed

3 files changed

+83
-19
lines changed

module/executiondatasync/optimistic_sync/pipeline_functional_test.go

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"testing"
8+
"testing/synctest"
89
"time"
910

1011
"github.com/jordanschalm/lockctx"
@@ -436,9 +437,14 @@ func (p *PipelineFunctionalSuite) TestMainCtxCancellationDuringWaitingPersist()
436437

437438
// TestPipelineShutdownOnParentAbandon verifies that the pipeline transitions correctly to a shutdown state when the parent is abandoned.
438439
func (p *PipelineFunctionalSuite) TestPipelineShutdownOnParentAbandon() {
440+
assertNoError := func(err error) {
441+
p.Require().NoError(err)
442+
}
443+
439444
tests := []struct {
440445
name string
441446
config PipelineConfig
447+
checkError func(err error)
442448
customSetup func(pipeline Pipeline, updateChan chan State, errChan chan error)
443449
}{
444450
{
@@ -449,43 +455,68 @@ func (p *PipelineFunctionalSuite) TestPipelineShutdownOnParentAbandon() {
449455
},
450456
parentState: StateAbandoned,
451457
},
458+
checkError: assertNoError,
459+
customSetup: func(pipeline Pipeline, updateChan chan State, errChan chan error) {},
452460
},
453461
{
454462
name: "from StateProcessing",
463+
config: PipelineConfig{
464+
beforePipelineRun: func(pipeline *PipelineImpl) {
465+
p.execDataRequester.On("RequestExecutionData", mock.Anything).Return(func(ctx context.Context) (*execution_data.BlockExecutionData, error) {
466+
pipeline.OnParentStateUpdated(StateAbandoned) // abandon during processing step
467+
return p.expectedExecutionData, nil
468+
}).Once()
469+
// this method may not be called depending on how quickly the RequestExecutionData
470+
// mock returns.
471+
p.txResultErrMsgsRequester.On("Request", mock.Anything).Return(p.expectedTxResultErrMsgs, nil).Maybe()
472+
},
473+
parentState: StateWaitingPersist,
474+
},
475+
checkError: func(err error) {
476+
// depending on the timing, the error may be during or after the indexing step.
477+
if err != nil {
478+
p.Require().ErrorContains(err, "could not perform indexing")
479+
} else {
480+
p.Require().NoError(err)
481+
}
482+
},
455483
customSetup: func(pipeline Pipeline, updateChan chan State, errChan chan error) {
456-
waitForStateUpdates(p.T(), updateChan, errChan, StateProcessing)
457-
458-
pipeline.OnParentStateUpdated(StateAbandoned)
484+
synctestWaitForStateUpdates(p.T(), updateChan, StateProcessing)
459485
},
460-
config: p.config,
461486
},
462487
{
463488
name: "from StateWaitingPersist",
489+
config: PipelineConfig{
490+
beforePipelineRun: func(pipeline *PipelineImpl) {
491+
p.execDataRequester.On("RequestExecutionData", mock.Anything).Return(p.expectedExecutionData, nil).Once()
492+
p.txResultErrMsgsRequester.On("Request", mock.Anything).Return(p.expectedTxResultErrMsgs, nil).Once()
493+
},
494+
parentState: StateWaitingPersist,
495+
},
496+
checkError: assertNoError,
464497
customSetup: func(pipeline Pipeline, updateChan chan State, errChan chan error) {
465-
waitForStateUpdates(p.T(), updateChan, errChan, StateProcessing, StateWaitingPersist)
466-
498+
synctestWaitForStateUpdates(p.T(), updateChan, StateProcessing, StateWaitingPersist)
467499
pipeline.OnParentStateUpdated(StateAbandoned)
468500
},
469-
config: p.config,
470501
},
471502
}
472503

473504
for _, test := range tests {
474505
p.T().Run(test.name, func(t *testing.T) {
475-
p.WithRunningPipeline(func(pipeline Pipeline, updateChan chan State, errChan chan error, cancel context.CancelFunc) {
476-
p.execDataRequester.On("RequestExecutionData", mock.Anything).Return(p.expectedExecutionData, nil).Maybe()
477-
p.txResultErrMsgsRequester.On("Request", mock.Anything).Return(p.expectedTxResultErrMsgs, nil).Maybe()
506+
p.execDataRequester.On("RequestExecutionData", mock.Anything).Unset()
507+
p.txResultErrMsgsRequester.On("Request", mock.Anything).Unset()
478508

479-
if test.customSetup != nil {
509+
synctest.Test(p.T(), func(t *testing.T) {
510+
p.WithRunningPipeline(func(pipeline Pipeline, updateChan chan State, errChan chan error, cancel context.CancelFunc) {
480511
test.customSetup(pipeline, updateChan, errChan)
481-
}
482512

483-
waitForStateUpdates(p.T(), updateChan, errChan, StateAbandoned)
484-
waitForError(p.T(), errChan, nil)
513+
synctestWaitForStateUpdates(p.T(), updateChan, StateAbandoned)
514+
test.checkError(<-errChan)
485515

486-
p.Assert().Equal(StateAbandoned, pipeline.GetState())
487-
p.Assert().Nil(p.core.workingData)
488-
}, test.config)
516+
p.Assert().Equal(StateAbandoned, pipeline.GetState())
517+
p.Assert().Nil(p.core.workingData)
518+
}, test.config)
519+
})
489520
})
490521
}
491522
}
@@ -534,13 +565,18 @@ func (p *PipelineFunctionalSuite) WithRunningPipeline(
534565
pipelineIsReady := make(chan struct{})
535566

536567
go func() {
568+
defer close(errChan)
569+
537570
if pipelineConfig.beforePipelineRun != nil {
538571
pipelineConfig.beforePipelineRun(pipeline)
539572
}
540573

541574
close(pipelineIsReady)
542575

543-
errChan <- pipeline.Run(ctx, p.core, pipelineConfig.parentState)
576+
err := pipeline.Run(ctx, p.core, pipelineConfig.parentState)
577+
if err != nil {
578+
errChan <- err
579+
}
544580
}()
545581

546582
<-pipelineIsReady

module/executiondatasync/optimistic_sync/pipeline_test_utils.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package optimistic_sync
22

33
import (
44
"testing"
5+
"testing/synctest"
56
"time"
67

78
"github.com/rs/zerolog"
@@ -116,3 +117,28 @@ func createPipeline(t *testing.T) (*PipelineImpl, *osmock.Core, <-chan State, *m
116117

117118
return pipeline, mockCore, stateReceiver.updateChan, parent
118119
}
120+
121+
// synctestWaitForStateUpdates waits for a sequence of state updates to occur using synctest.Wait.
122+
// updates must be received in the correct order or the test will fail.
123+
// TODO: refactor all tests to use the synctest approach.
124+
func synctestWaitForStateUpdates(t *testing.T, updateChan <-chan State, expectedStates ...State) {
125+
for _, expected := range expectedStates {
126+
synctest.Wait()
127+
update, ok := <-updateChan
128+
require.True(t, ok, "update channel closed unexpectedly")
129+
assert.Equalf(t, expected, update, "expected pipeline to transition to %s, but got %s", expected, update)
130+
}
131+
}
132+
133+
// synctestWaitForError waits for an error from the errChan using synctest.Wait and asserts it matches the expected error.
134+
// TODO: refactor all tests to use the synctest approach.
135+
func synctestWaitForError(t *testing.T, errChan <-chan error, expectedErr error) {
136+
synctest.Wait()
137+
err := <-errChan
138+
if expectedErr == nil {
139+
assert.NoError(t, err, "Pipeline should complete without errors")
140+
} else {
141+
assert.ErrorIs(t, err, expectedErr)
142+
}
143+
144+
}

module/state_synchronization/indexer/indexer_core.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,14 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
222222
g.Go(func() error {
223223
start := time.Now()
224224

225-
// index all collections except the system chunk
226225
// Note: the access ingestion engine also indexes collections, starting when the block is
227226
// finalized. This process can fall behind due to the node being offline, resource issues
228227
// or network congestion. This indexer ensures that collections are never farther behind
229228
// than the latest indexed block. Calling the collection handler with a collection that
230229
// has already been indexed is a noop.
230+
231+
// index all collections except the system chunk. if there is only a single chunk, it is the
232+
// system chunk and can be skipped.
231233
indexedCount := 0
232234
if len(data.ChunkExecutionDatas) > 1 {
233235
for _, chunk := range data.ChunkExecutionDatas[0 : len(data.ChunkExecutionDatas)-1] {

0 commit comments

Comments
 (0)