Skip to content

Commit 9d3633c

Browse files
committed
use synctest to fix flakiness in pipeline tests
1 parent 7ead0fb commit 9d3633c

File tree

1 file changed

+153
-103
lines changed

1 file changed

+153
-103
lines changed

module/executiondatasync/optimistic_sync/pipeline_test.go

Lines changed: 153 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import (
44
"context"
55
"errors"
66
"testing"
7+
"testing/synctest"
78
"time"
89

910
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/mock"
12+
"github.com/stretchr/testify/require"
1113

1214
osmock "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/mock"
1315
"github.com/onflow/flow-go/utils/unittest"
@@ -16,109 +18,134 @@ import (
1618
// TestPipelineStateTransitions verifies that the pipeline correctly transitions
1719
// through states when provided with the correct conditions.
1820
func TestPipelineStateTransitions(t *testing.T) {
19-
pipeline, mockCore, updateChan, parent := createPipeline(t)
21+
synctest.Test(t, func(t *testing.T) {
22+
pipeline, mockCore, updateChan, parent := createPipeline(t)
2023

21-
pipeline.SetSealed()
22-
parent.UpdateState(StateComplete, pipeline)
24+
pipeline.SetSealed()
25+
parent.UpdateState(StateComplete, pipeline)
2326

24-
mockCore.On("Download", mock.Anything).Return(nil)
25-
mockCore.On("Index").Return(nil)
26-
mockCore.On("Persist").Return(nil)
27+
mockCore.On("Download", mock.Anything).Return(nil)
28+
mockCore.On("Index").Return(nil)
29+
mockCore.On("Persist").Return(nil)
2730

28-
assert.Equal(t, StatePending, pipeline.GetState(), "Pipeline should start in Pending state")
31+
assert.Equal(t, StatePending, pipeline.GetState(), "Pipeline should start in Pending state")
2932

30-
errChan := make(chan error)
31-
go func() {
32-
errChan <- pipeline.Run(context.Background(), mockCore, parent.GetState())
33-
}()
33+
go func() {
34+
err := pipeline.Run(context.Background(), mockCore, parent.GetState())
35+
require.NoError(t, err)
36+
}()
3437

35-
// Wait for pipeline to reach WaitingPersist state
36-
expectedStates := []State{StateProcessing, StateWaitingPersist, StateComplete}
37-
waitForStateUpdates(t, updateChan, errChan, expectedStates...)
38-
assert.Equal(t, StateComplete, pipeline.GetState(), "Pipeline should be in Complete state")
38+
for _, expected := range []State{StateProcessing, StateWaitingPersist, StateComplete} {
39+
synctest.Wait()
40+
update := <-updateChan
41+
assert.Equalf(t, expected, update, "expected pipeline to transition to %s, but got %s", expected, update)
42+
}
3943

40-
// Run should complete without error
41-
waitForError(t, errChan, nil)
44+
// wait for Run goroutine to finish
45+
synctest.Wait()
46+
})
4247
}
4348

4449
// TestPipelineParentDependentTransitions verifies that a pipeline's transitions
4550
// depend on the parent pipeline's state.
4651
func TestPipelineParentDependentTransitions(t *testing.T) {
47-
pipeline, mockCore, updateChan, parent := createPipeline(t)
52+
synctest.Test(t, func(t *testing.T) {
53+
pipeline, mockCore, updateChan, parent := createPipeline(t)
4854

49-
mockCore.On("Download", mock.Anything).Return(nil)
50-
mockCore.On("Index").Return(nil)
51-
mockCore.On("Persist").Return(nil)
55+
assert.Equal(t, StatePending, pipeline.GetState(), "Pipeline should start in Pending state")
5256

53-
assert.Equal(t, StatePending, pipeline.GetState(), "Pipeline should start in Pending state")
57+
go func() {
58+
err := pipeline.Run(context.Background(), mockCore, parent.GetState())
59+
require.NoError(t, err)
60+
}()
5461

55-
errChan := make(chan error)
56-
go func() {
57-
errChan <- pipeline.Run(context.Background(), mockCore, parent.GetState())
58-
}()
62+
// 1. Initial update - parent in Ready state
63+
parent.UpdateState(StatePending, pipeline)
5964

60-
// Initial update - parent in Ready state
61-
parent.UpdateState(StatePending, pipeline)
65+
synctest.Wait()
6266

63-
// Check that pipeline remains in Ready state
64-
waitNeverStateUpdate(t, updateChan, errChan)
65-
assert.Equal(t, StatePending, pipeline.GetState(), "Pipeline should start in Ready state")
66-
mockCore.AssertNotCalled(t, "Download")
67+
// Check that pipeline remains in Ready state
68+
assertNoUpdate(t, updateChan)
69+
assert.Equal(t, StatePending, pipeline.GetState(), "Pipeline should start in Ready state")
6770

68-
// Update parent to downloading
69-
parent.UpdateState(StateProcessing, pipeline)
71+
// 2. Update parent to downloading
72+
parent.UpdateState(StateProcessing, pipeline)
7073

71-
// Pipeline should now progress to WaitingPersist state and stop
72-
expectedStates := []State{StateProcessing, StateWaitingPersist}
73-
waitForStateUpdates(t, updateChan, errChan, expectedStates...)
74-
assert.Equal(t, StateWaitingPersist, pipeline.GetState(), "Pipeline should progress to WaitingPersist state")
75-
mockCore.AssertCalled(t, "Download", mock.Anything)
76-
mockCore.AssertCalled(t, "Index")
77-
mockCore.AssertNotCalled(t, "Persist")
74+
// Pipeline should now call Download and Index within the processing state, then progress to
75+
// WaitingPersist and stop
76+
mockCore.On("Download", mock.Anything).Return(nil)
77+
mockCore.On("Index").Return(nil)
78+
for _, expected := range []State{StateProcessing, StateWaitingPersist} {
79+
synctest.Wait()
80+
update := <-updateChan
81+
assert.Equal(t, expected, update, "Pipeline should progress to %s state", expected)
82+
}
7883

79-
waitNeverStateUpdate(t, updateChan, errChan)
80-
assert.Equal(t, StateWaitingPersist, pipeline.GetState(), "Pipeline should remain in WaitingPersist state")
84+
// pipeline should remain in WaitingPersist state
85+
synctest.Wait()
86+
assertNoUpdate(t, updateChan)
87+
assert.Equal(t, StateWaitingPersist, pipeline.GetState(), "Pipeline should remain in WaitingPersist state")
8188

82-
// Update parent to complete - should allow persisting when sealed
83-
parent.UpdateState(StateComplete, pipeline)
89+
// 3. Update parent to complete - should allow persisting when sealed
90+
parent.UpdateState(StateComplete, pipeline)
8491

85-
// this alone should not allow the pipeline to progress to any other state
86-
waitNeverStateUpdate(t, updateChan, errChan)
87-
assert.Equal(t, StateWaitingPersist, pipeline.GetState(), "Pipeline should remain in WaitingPersist state")
92+
// this alone should not allow the pipeline to progress to any other state
93+
synctest.Wait()
94+
assertNoUpdate(t, updateChan)
95+
assert.Equal(t, StateWaitingPersist, pipeline.GetState(), "Pipeline should remain in WaitingPersist state")
8896

89-
// Mark the execution result as sealed, this should allow the pipeline to progress to Complete state
90-
pipeline.SetSealed()
97+
// 4. Mark the execution result as sealed, this should allow the pipeline to progress to Complete state
98+
pipeline.SetSealed()
99+
mockCore.On("Persist").Return(nil)
91100

92-
// Wait for pipeline to complete
93-
expectedStates = []State{StateComplete}
94-
waitForStateUpdates(t, updateChan, errChan, expectedStates...)
95-
assert.Equal(t, StateComplete, pipeline.GetState(), "Pipeline should reach Complete state")
96-
mockCore.AssertCalled(t, "Persist")
101+
// Wait for pipeline to complete
102+
synctest.Wait()
103+
update := <-updateChan
104+
assert.Equal(t, StateComplete, update, "Pipeline should remain in WaitingPersist state")
97105

98-
// Run should complete without error
99-
waitForError(t, errChan, nil)
106+
synctest.Wait()
107+
assertNoUpdate(t, updateChan)
108+
assert.Equal(t, StateComplete, pipeline.GetState(), "Pipeline should reach Complete state")
109+
110+
// Run should complete without error
111+
synctest.Wait()
112+
})
113+
}
114+
115+
func assertNoUpdate(t *testing.T, updateChan <-chan State) {
116+
select {
117+
case update := <-updateChan:
118+
t.Fatalf("Pipeline should not have transitioned to any state: got %s", update)
119+
default:
120+
}
100121
}
101122

102123
// TestParentAbandoned verifies that a pipeline is properly abandoned when
103124
// the parent pipeline is abandoned.
104125
func TestAbandoned(t *testing.T) {
105126
t.Run("starts already abandoned", func(t *testing.T) {
106-
pipeline, mockCore, updateChan, parent := createPipeline(t)
127+
synctest.Test(t, func(t *testing.T) {
128+
pipeline, mockCore, updateChan, parent := createPipeline(t)
107129

108-
mockCore.On("Abandon").Return(nil)
130+
mockCore.On("Abandon").Return(nil)
109131

110-
pipeline.Abandon()
132+
pipeline.Abandon()
111133

112-
errChan := make(chan error)
113-
go func() {
114-
errChan <- pipeline.Run(context.Background(), mockCore, parent.GetState())
115-
}()
134+
go func() {
135+
err := pipeline.Run(context.Background(), mockCore, parent.GetState())
136+
require.NoError(t, err)
137+
}()
116138

117-
// first state must be abandoned
118-
waitForStateUpdates(t, updateChan, errChan, StateAbandoned)
139+
// first state must be abandoned
140+
synctest.Wait()
141+
update := <-updateChan
142+
assert.Equal(t, StateAbandoned, update, "Pipeline should transition to Abandoned state")
119143

120-
// Run should complete without error
121-
waitForError(t, errChan, nil)
144+
// wait for Run goroutine to finish
145+
synctest.Wait()
146+
assertNoUpdate(t, updateChan)
147+
assert.Equal(t, StateAbandoned, pipeline.GetState(), "Pipeline should remain in Abandoned state")
148+
})
122149
})
123150

124151
// Test cases abandoning during different stages of processing
@@ -211,22 +238,31 @@ func TestAbandoned(t *testing.T) {
211238

212239
for _, tc := range testCases {
213240
t.Run(tc.name, func(t *testing.T) {
214-
pipeline, mockCore, updateChan, parent := createPipeline(t)
215-
tc.setupMock(pipeline, parent, mockCore)
216-
217-
mockCore.On("Abandon").Return(nil)
218-
219-
errChan := make(chan error)
220-
go func() {
221-
errChan <- pipeline.Run(context.Background(), mockCore, parent.GetState())
222-
}()
223-
224-
// Send parent update to start processing
225-
parent.UpdateState(StateProcessing, pipeline)
226-
227-
waitForStateUpdates(t, updateChan, errChan, tc.expectedStates...)
228-
229-
waitForError(t, errChan, nil)
241+
synctest.Test(t, func(t *testing.T) {
242+
pipeline, mockCore, updateChan, parent := createPipeline(t)
243+
tc.setupMock(pipeline, parent, mockCore)
244+
245+
mockCore.On("Abandon").Return(nil)
246+
247+
go func() {
248+
err := pipeline.Run(context.Background(), mockCore, parent.GetState())
249+
require.NoError(t, err)
250+
}()
251+
252+
// Send parent update to start processing
253+
parent.UpdateState(StateProcessing, pipeline)
254+
255+
for _, expected := range tc.expectedStates {
256+
synctest.Wait()
257+
update := <-updateChan
258+
assert.Equal(t, expected, update, "Pipeline should progress to %s state", expected)
259+
}
260+
261+
// wait for Run goroutine to finish
262+
synctest.Wait()
263+
assertNoUpdate(t, updateChan)
264+
assert.Equal(t, StateAbandoned, pipeline.GetState(), "Pipeline should remain in Abandoned state")
265+
})
230266
})
231267
}
232268
}
@@ -278,19 +314,22 @@ func TestPipelineContextCancellation(t *testing.T) {
278314

279315
for _, tc := range testCases {
280316
t.Run(tc.name, func(t *testing.T) {
281-
pipeline, mockCore, _, parent := createPipeline(t)
317+
synctest.Test(t, func(t *testing.T) {
318+
pipeline, mockCore, _, parent := createPipeline(t)
282319

283-
parent.UpdateState(StateComplete, pipeline)
284-
pipeline.SetSealed()
320+
parent.UpdateState(StateComplete, pipeline)
321+
pipeline.SetSealed()
285322

286-
ctx := tc.setupMock(pipeline, parent, mockCore)
323+
ctx := tc.setupMock(pipeline, parent, mockCore)
287324

288-
errChan := make(chan error)
289-
go func() {
290-
errChan <- pipeline.Run(ctx, mockCore, parent.GetState())
291-
}()
325+
go func() {
326+
err := pipeline.Run(ctx, mockCore, parent.GetState())
327+
require.ErrorIs(t, err, context.Canceled)
328+
}()
292329

293-
waitForError(t, errChan, context.Canceled)
330+
// wait for Run goroutine to finish
331+
synctest.Wait()
332+
})
294333
})
295334
}
296335
}
@@ -339,21 +378,32 @@ func TestPipelineErrorHandling(t *testing.T) {
339378

340379
for _, tc := range testCases {
341380
t.Run(tc.name, func(t *testing.T) {
342-
pipeline, mockCore, updateChan, parent := createPipeline(t)
381+
synctest.Test(t, func(t *testing.T) {
382+
pipeline, mockCore, updateChan, parent := createPipeline(t)
343383

344-
tc.setupMock(pipeline, parent, mockCore, tc.expectedErr)
384+
tc.setupMock(pipeline, parent, mockCore, tc.expectedErr)
345385

346-
errChan := make(chan error)
347-
go func() {
348-
errChan <- pipeline.Run(context.Background(), mockCore, parent.GetState())
349-
}()
386+
go func() {
387+
err := pipeline.Run(context.Background(), mockCore, parent.GetState())
388+
require.ErrorIs(t, err, tc.expectedErr)
389+
}()
390+
391+
// Send parent update to trigger processing
392+
parent.UpdateState(StateProcessing, pipeline)
350393

351-
// Send parent update to trigger processing
352-
parent.UpdateState(StateProcessing, pipeline)
394+
for _, expected := range tc.expectedStates {
395+
synctest.Wait()
396+
update := <-updateChan
397+
assert.Equal(t, expected, update, "Pipeline should progress to %s state", expected)
398+
}
353399

354-
waitForStateUpdates(t, updateChan, errChan, tc.expectedStates...)
400+
synctest.Wait()
401+
assertNoUpdate(t, updateChan)
402+
assert.Equal(t, tc.expectedStates[len(tc.expectedStates)-1], pipeline.GetState(), "Pipeline should remain in final state")
355403

356-
waitForError(t, errChan, tc.expectedErr)
404+
// wait for Run goroutine to finish
405+
synctest.Wait()
406+
})
357407
})
358408
}
359409
}

0 commit comments

Comments
 (0)