Skip to content

Commit 1d0a68a

Browse files
authored
Merge pull request #114 from linuxboot/refactor/decouple_step_state_and_test_runner
Make TestRunner's StepState an independent entity
2 parents 79cf322 + dd102f4 commit 1d0a68a

File tree

2 files changed

+244
-180
lines changed

2 files changed

+244
-180
lines changed

pkg/runner/step_state.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package runner
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"strconv"
7+
"sync"
8+
9+
"github.com/linuxboot/contest/pkg/event/testevent"
10+
"github.com/linuxboot/contest/pkg/target"
11+
"github.com/linuxboot/contest/pkg/test"
12+
"github.com/linuxboot/contest/pkg/xcontext"
13+
)
14+
15+
// stepState contains state associated with one state of the pipeline in TestRunner.
16+
type stepState struct {
17+
mu sync.Mutex
18+
cancel xcontext.CancelFunc
19+
20+
stepIndex int // Index of this step in the pipeline.
21+
sb test.TestStepBundle // The test bundle.
22+
23+
ev testevent.Emitter
24+
stepRunner *StepRunner
25+
addTarget AddTargetToStep
26+
stopped chan struct{}
27+
28+
resumeState json.RawMessage // Resume state passed to and returned by the Run method.
29+
resumeStateTargets []target.Target // Targets that were being processed during pause.
30+
resumeTargetsNotifiers map[string]ChanNotifier // resumeStateTargets targets results
31+
runErr error // Runner error, returned from Run() or an error condition detected by the reader.
32+
onError func(err error)
33+
}
34+
35+
func newStepState(
36+
stepIndex int,
37+
sb test.TestStepBundle,
38+
emitterFactory TestStepEventsEmitterFactory,
39+
resumeState json.RawMessage,
40+
resumeStateTargets []target.Target,
41+
onError func(err error),
42+
) *stepState {
43+
return &stepState{
44+
stepIndex: stepIndex,
45+
sb: sb,
46+
ev: emitterFactory.New(sb.TestStepLabel),
47+
stepRunner: NewStepRunner(),
48+
stopped: make(chan struct{}),
49+
resumeState: resumeState,
50+
resumeStateTargets: resumeStateTargets,
51+
onError: onError,
52+
}
53+
}
54+
55+
func (ss *stepState) Started() bool {
56+
return ss.stepRunner.Started()
57+
}
58+
59+
func (ss *stepState) Stop() {
60+
ss.stepRunner.Stop()
61+
}
62+
63+
func (ss *stepState) ForceStop() {
64+
ss.mu.Lock()
65+
defer ss.mu.Unlock()
66+
67+
if ss.cancel != nil {
68+
ss.cancel()
69+
} else {
70+
close(ss.stopped)
71+
}
72+
}
73+
74+
func (ss *stepState) GetInitResumeState() json.RawMessage {
75+
return ss.resumeState
76+
}
77+
78+
func (ss *stepState) GetTestStepLabel() string {
79+
return ss.sb.TestStepLabel
80+
}
81+
82+
func (ss *stepState) Run(ctx xcontext.Context) error {
83+
ss.mu.Lock()
84+
defer ss.mu.Unlock()
85+
86+
select {
87+
case <-ss.stopped:
88+
return fmt.Errorf("stopped")
89+
default:
90+
}
91+
92+
if ss.stepRunner.Started() {
93+
return nil
94+
}
95+
96+
stepCtx, cancel := xcontext.WithCancel(ctx)
97+
stepCtx = stepCtx.WithField("step_index", strconv.Itoa(ss.stepIndex))
98+
stepCtx = stepCtx.WithField("step_label", ss.sb.TestStepLabel)
99+
100+
addTarget, resumeTargetsNotifiers, stepRunResult, err := ss.stepRunner.Run(stepCtx, ss.sb, ss.ev, ss.resumeState, ss.resumeStateTargets)
101+
if err != nil {
102+
return fmt.Errorf("failed to launch step runner: %v", err)
103+
}
104+
ss.cancel = cancel
105+
ss.addTarget = addTarget
106+
ss.resumeTargetsNotifiers = make(map[string]ChanNotifier)
107+
for i := 0; i < len(ss.resumeStateTargets); i++ {
108+
ss.resumeTargetsNotifiers[ss.resumeStateTargets[i].ID] = resumeTargetsNotifiers[i]
109+
}
110+
111+
go func() {
112+
defer func() {
113+
close(ss.stopped)
114+
stepCtx.Debugf("StepRunner fully stopped")
115+
}()
116+
117+
select {
118+
case stepErr := <-stepRunResult.NotifyCh():
119+
ss.SetError(stepCtx, stepErr)
120+
case <-stepCtx.Done():
121+
stepCtx.Debugf("Cancelled step context during waiting for step run result")
122+
}
123+
}()
124+
return nil
125+
}
126+
127+
func (ss *stepState) InjectTarget(ctx xcontext.Context, tgt *target.Target) (ChanNotifier, error) {
128+
ss.mu.Lock()
129+
defer ss.mu.Unlock()
130+
131+
if !ss.stepRunner.Started() {
132+
return nil, fmt.Errorf("step was not started")
133+
}
134+
if notifier := ss.resumeTargetsNotifiers[tgt.ID]; notifier != nil {
135+
return notifier, nil
136+
}
137+
return ss.addTarget(ctx, tgt)
138+
}
139+
140+
func (ss *stepState) NotifyStopped() <-chan struct{} {
141+
return ss.stopped
142+
}
143+
144+
func (ss *stepState) GetError() error {
145+
ss.mu.Lock()
146+
defer ss.mu.Unlock()
147+
148+
return ss.runErr
149+
}
150+
151+
func (ss *stepState) String() string {
152+
return fmt.Sprintf("[#%d %s]", ss.stepIndex, ss.sb.TestStepLabel)
153+
}
154+
155+
func (ss *stepState) SetError(ctx xcontext.Context, err error) {
156+
ss.mu.Lock()
157+
defer ss.mu.Unlock()
158+
159+
if err == nil || ss.runErr != nil {
160+
return
161+
}
162+
ctx.Errorf("Step '%s' failed with error: %v", ss.sb.TestStepLabel, err)
163+
ss.runErr = err
164+
165+
if ss.runErr != xcontext.ErrPaused && ss.runErr != xcontext.ErrCanceled {
166+
if err := emitEvent(ctx, ss.ev, EventTestError, nil, err.Error()); err != nil {
167+
ctx.Errorf("failed to emit event: %s", err)
168+
}
169+
}
170+
171+
// notify last as callback may use GetError or cancel context
172+
go func() {
173+
ss.onError(err)
174+
}()
175+
}

0 commit comments

Comments
 (0)