@@ -37,12 +37,12 @@ func TestPipelineStateTransitions(t *testing.T) {
3737
3838 for _ , expected := range []State {StateProcessing , StateWaitingPersist , StateComplete } {
3939 synctest .Wait ()
40- update := <- updateChan
41- assert .Equalf (t , expected , update , "expected pipeline to transition to %s, but got %s" , expected , update )
40+ assertUpdate (t , updateChan , expected )
4241 }
4342
4443 // wait for Run goroutine to finish
4544 synctest .Wait ()
45+ assertNoUpdate (t , pipeline , updateChan , StateComplete )
4646 })
4747}
4848
@@ -62,11 +62,9 @@ func TestPipelineParentDependentTransitions(t *testing.T) {
6262 // 1. Initial update - parent in Ready state
6363 parent .UpdateState (StatePending , pipeline )
6464
65- synctest .Wait ()
66-
6765 // Check that pipeline remains in Ready state
68- assertNoUpdate ( t , updateChan )
69- assert . Equal (t , StatePending , pipeline . GetState (), "Pipeline should start in Ready state" )
66+ synctest . Wait ( )
67+ assertNoUpdate (t , pipeline , updateChan , StatePending )
7068
7169 // 2. Update parent to downloading
7270 parent .UpdateState (StateProcessing , pipeline )
@@ -77,49 +75,35 @@ func TestPipelineParentDependentTransitions(t *testing.T) {
7775 mockCore .On ("Index" ).Return (nil )
7876 for _ , expected := range []State {StateProcessing , StateWaitingPersist } {
7977 synctest .Wait ()
80- update := <- updateChan
81- assert .Equal (t , expected , update , "Pipeline should progress to %s state" , expected )
78+ assertUpdate (t , updateChan , expected )
8279 }
80+ assert .Equal (t , StateWaitingPersist , pipeline .GetState (), "Pipeline should be in StateWaitingPersist state" )
8381
8482 // pipeline should remain in WaitingPersist state
8583 synctest .Wait ()
86- assertNoUpdate (t , updateChan )
87- assert .Equal (t , StateWaitingPersist , pipeline .GetState (), "Pipeline should remain in WaitingPersist state" )
84+ assertNoUpdate (t , pipeline , updateChan , StateWaitingPersist )
8885
8986 // 3. Update parent to complete - should allow persisting when sealed
9087 parent .UpdateState (StateComplete , pipeline )
9188
9289 // this alone should not allow the pipeline to progress to any other state
9390 synctest .Wait ()
94- assertNoUpdate (t , updateChan )
95- assert .Equal (t , StateWaitingPersist , pipeline .GetState (), "Pipeline should remain in WaitingPersist state" )
91+ assertNoUpdate (t , pipeline , updateChan , StateWaitingPersist )
9692
9793 // 4. Mark the execution result as sealed, this should allow the pipeline to progress to Complete state
9894 pipeline .SetSealed ()
9995 mockCore .On ("Persist" ).Return (nil )
10096
10197 // Wait for pipeline to complete
10298 synctest .Wait ()
103- update := <- updateChan
104- assert .Equal (t , StateComplete , update , "Pipeline should remain in WaitingPersist state" )
105-
106- synctest .Wait ()
107- assertNoUpdate (t , updateChan )
108- assert .Equal (t , StateComplete , pipeline .GetState (), "Pipeline should reach Complete state" )
99+ assertUpdate (t , updateChan , StateComplete )
109100
110101 // Run should complete without error
111102 synctest .Wait ()
103+ assertNoUpdate (t , pipeline , updateChan , StateComplete )
112104 })
113105}
114106
115- func assertNoUpdate (t * testing.T , updateChan <- chan State ) {
116- select {
117- case update := <- updateChan :
118- t .Fatalf ("Pipeline should not have transitioned to any state: got %s" , update )
119- default :
120- }
121- }
122-
123107// TestParentAbandoned verifies that a pipeline is properly abandoned when
124108// the parent pipeline is abandoned.
125109func TestAbandoned (t * testing.T ) {
@@ -138,47 +122,44 @@ func TestAbandoned(t *testing.T) {
138122
139123 // first state must be abandoned
140124 synctest .Wait ()
141- update := <- updateChan
142- assert .Equal (t , StateAbandoned , update , "Pipeline should transition to Abandoned state" )
125+ assertUpdate (t , updateChan , StateAbandoned )
143126
144127 // wait for Run goroutine to finish
145128 synctest .Wait ()
146- assertNoUpdate (t , updateChan )
147- assert .Equal (t , StateAbandoned , pipeline .GetState (), "Pipeline should remain in Abandoned state" )
129+ assertNoUpdate (t , pipeline , updateChan , StateAbandoned )
148130 })
149131 })
150132
151133 // Test cases abandoning during different stages of processing
152134 testCases := []struct {
153135 name string
154136 setupMock func (* PipelineImpl , * mockStateProvider , * osmock.Core )
137+ onStateFns map [State ]func (* PipelineImpl , * mockStateProvider )
155138 expectedStates []State
156139 }{
157140 {
158141 name : "Abandon during download" ,
159142 setupMock : func (pipeline * PipelineImpl , parent * mockStateProvider , mockCore * osmock.Core ) {
160- mockCore .On ("Download" , mock .Anything ).Run (func (args mock.Arguments ) {
161- pipeline .Abandon ()
162-
163- ctx := args [0 ].(context.Context )
164- unittest .RequireCloseBefore (t , ctx .Done (), 500 * time .Millisecond , "Abandon should cause context to be canceled" )
165- }).Return (func (ctx context.Context ) error {
166- return ctx .Err ()
167- })
143+ mockCore .
144+ On ("Download" , mock .Anything ).
145+ Return (func (ctx context.Context ) error {
146+ pipeline .Abandon ()
147+ <- ctx .Done () // abandon should cause context to be canceled
148+ return ctx .Err ()
149+ })
168150 },
169151 expectedStates : []State {StateProcessing , StateAbandoned },
170152 },
171153 {
172154 name : "Parent abandoned during download" ,
173155 setupMock : func (pipeline * PipelineImpl , parent * mockStateProvider , mockCore * osmock.Core ) {
174- mockCore .On ("Download" , mock .Anything ).Run (func (args mock.Arguments ) {
175- parent .UpdateState (StateAbandoned , pipeline )
176-
177- ctx := args [0 ].(context.Context )
178- unittest .RequireCloseBefore (t , ctx .Done (), 500 * time .Millisecond , "Abandon should cause context to be canceled" )
179- }).Return (func (ctx context.Context ) error {
180- return ctx .Err ()
181- })
156+ mockCore .
157+ On ("Download" , mock .Anything ).
158+ Return (func (ctx context.Context ) error {
159+ parent .UpdateState (StateAbandoned , pipeline )
160+ <- ctx .Done () // abandon should cause context to be canceled
161+ return ctx .Err ()
162+ })
182163 },
183164 expectedStates : []State {StateProcessing , StateAbandoned },
184165 },
@@ -208,25 +189,25 @@ func TestAbandoned(t *testing.T) {
208189 name : "Abandon during waiting to persist" ,
209190 setupMock : func (pipeline * PipelineImpl , parent * mockStateProvider , mockCore * osmock.Core ) {
210191 mockCore .On ("Download" , mock .Anything ).Return (nil )
211- mockCore .On ("Index" ).Run ( func ( args mock. Arguments ) {
212- go func () {
213- time . Sleep ( 100 * time . Millisecond )
214- pipeline . Abandon ()
215- } ()
216- }). Return ( nil )
192+ mockCore .On ("Index" ).Return ( nil )
193+ },
194+ onStateFns : map [ State ] func ( * PipelineImpl , * mockStateProvider ){
195+ StateWaitingPersist : func ( pipeline * PipelineImpl , parent * mockStateProvider ) {
196+ pipeline . Abandon ()
197+ },
217198 },
218199 expectedStates : []State {StateProcessing , StateWaitingPersist , StateAbandoned },
219200 },
220201 {
221202 name : "Parent abandoned during waiting to persist" ,
222203 setupMock : func (pipeline * PipelineImpl , parent * mockStateProvider , mockCore * osmock.Core ) {
223204 mockCore .On ("Download" , mock .Anything ).Return (nil )
224- mockCore .On ("Index" ).Run ( func ( args mock. Arguments ) {
225- go func () {
226- time . Sleep ( 100 * time . Millisecond )
227- parent . UpdateState ( StateAbandoned , pipeline )
228- }( )
229- }). Return ( nil )
205+ mockCore .On ("Index" ).Return ( nil )
206+ },
207+ onStateFns : map [ State ] func ( * PipelineImpl , * mockStateProvider ){
208+ StateWaitingPersist : func ( pipeline * PipelineImpl , parent * mockStateProvider ) {
209+ parent . UpdateState ( StateAbandoned , pipeline )
210+ },
230211 },
231212 expectedStates : []State {StateProcessing , StateWaitingPersist , StateAbandoned },
232213 },
@@ -254,14 +235,16 @@ func TestAbandoned(t *testing.T) {
254235
255236 for _ , expected := range tc .expectedStates {
256237 synctest .Wait ()
257- update := <- updateChan
258- assert .Equal (t , expected , update , "Pipeline should progress to %s state" , expected )
238+ assertUpdate (t , updateChan , expected )
239+
240+ if fn , ok := tc .onStateFns [expected ]; ok {
241+ fn (pipeline , parent )
242+ }
259243 }
260244
261245 // wait for Run goroutine to finish
262246 synctest .Wait ()
263- assertNoUpdate (t , updateChan )
264- assert .Equal (t , StateAbandoned , pipeline .GetState (), "Pipeline should remain in Abandoned state" )
247+ assertNoUpdate (t , pipeline , updateChan , StateAbandoned )
265248 })
266249 })
267250 }
@@ -393,16 +376,13 @@ func TestPipelineErrorHandling(t *testing.T) {
393376
394377 for _ , expected := range tc .expectedStates {
395378 synctest .Wait ()
396- update := <- updateChan
397- assert .Equal (t , expected , update , "Pipeline should progress to %s state" , expected )
379+ assertUpdate (t , updateChan , expected )
398380 }
399381
400- synctest .Wait ()
401- assertNoUpdate (t , updateChan )
402- assert .Equal (t , tc .expectedStates [len (tc .expectedStates )- 1 ], pipeline .GetState (), "Pipeline should remain in final state" )
403-
404382 // wait for Run goroutine to finish
405383 synctest .Wait ()
384+ finalState := tc .expectedStates [len (tc .expectedStates )- 1 ]
385+ assertNoUpdate (t , pipeline , updateChan , finalState )
406386 })
407387 })
408388 }
@@ -449,3 +429,21 @@ func TestValidateTransition(t *testing.T) {
449429 }
450430 }
451431}
432+
433+ func assertNoUpdate (t * testing.T , pipeline Pipeline , updateChan <- chan State , existingState State ) {
434+ select {
435+ case update := <- updateChan :
436+ t .Errorf ("Pipeline should remain in %s state, but transitioned to %s" , existingState , update )
437+ default :
438+ assert .Equal (t , existingState , pipeline .GetState (), "Pipeline should remain in %s state" , existingState )
439+ }
440+ }
441+
442+ func assertUpdate (t * testing.T , updateChan <- chan State , expected State ) {
443+ select {
444+ case update := <- updateChan :
445+ assert .Equal (t , expected , update , "Pipeline should transition to %s state" , expected )
446+ default :
447+ t .Errorf ("Pipeline should have transitioned to %s state" , expected )
448+ }
449+ }
0 commit comments