Skip to content

Commit 308f506

Browse files
Merge #4165
4165: Cleanup stop control component r=janezpodhostnik a=janezpodhostnik I was trying to extract this change https://github.com/onflow/flow-go/pull/3736/files#diff-c495742b9ab8ee8b5b050d102767ba106b2fda1cbf4f1c0a49acee333a118c0fR107 out of this PR #3736, but I wasn't quite sure what was going on, so I did some cleaning as well. - changed some naming - changed formatting - added todos to look into in the future I think they might be related to these tests being disabled: ``` func Test_OnlyHeadOfTheQueueIsExecuted(t *testing.T) { unittest.SkipUnless(t, unittest.TEST_FLAKY, "To be fixed later") ``` ``` func TestBlocksArentExecutedMultipleTimes_multipleBlockEnqueue(t *testing.T) { unittest.SkipUnless(t, unittest.TEST_TODO, "broken test") ``` I'll take a look at that next. Co-authored-by: Janez Podhostnik <[email protected]>
2 parents a16142d + a0592a8 commit 308f506

File tree

1 file changed

+134
-54
lines changed

1 file changed

+134
-54
lines changed

engine/execution/ingestion/stop_control.go

Lines changed: 134 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,45 +17,55 @@ import (
1717
// StopControl follows states described in StopState
1818
type StopControl struct {
1919
sync.RWMutex
20-
// desired stop height, the first value new version should be used, so this height WON'T
21-
// be executed
22-
height uint64
20+
// desired stopHeight, the first value new version should be used,
21+
// so this height WON'T be executed
22+
stopHeight uint64
2323

24-
// if the node should crash or just pause after reaching stop height
25-
crash bool
24+
// if the node should crash or just pause after reaching stopHeight
25+
crash bool
26+
27+
// This is the block ID of the block that should be executed last.
2628
stopAfterExecuting flow.Identifier
2729

2830
log zerolog.Logger
2931
state StopControlState
3032

31-
// used to prevent setting stop height to block which has already been executed
33+
// used to prevent setting stopHeight to block which has already been executed
3234
highestExecutingHeight uint64
3335
}
3436

3537
type StopControlState byte
3638

3739
const (
38-
// StopControlOff default state, envisioned to be used most of the time. Stopping module is simply off,
39-
// blocks will be processed "as usual".
40+
// StopControlOff default state, envisioned to be used most of the time.
41+
// Stopping module is simply off, blocks will be processed "as usual".
4042
StopControlOff StopControlState = iota
4143

42-
// StopControlSet means stop height is set but not reached yet, and nothing related to stopping happened yet.
44+
// StopControlSet means stopHeight is set but not reached yet,
45+
// and nothing related to stopping happened yet.
4346
// We could still go back to StopControlOff or progress to StopControlCommenced.
4447
StopControlSet
4548

46-
// StopControlCommenced indicates that stopping process has commenced and no parameters can be changed anymore.
47-
// For example, blocks at or above stop height has been received, but finalization didn't reach stop height yet.
49+
// StopControlCommenced indicates that stopping process has commenced
50+
// and no parameters can be changed anymore.
51+
// For example, blocks at or above stopHeight has been received,
52+
// but finalization didn't reach stopHeight yet.
4853
// It can only progress to StopControlPaused
4954
StopControlCommenced
5055

51-
// StopControlPaused means EN has stopped processing blocks. It can happen by reaching the set stopping `height`, or
56+
// StopControlPaused means EN has stopped processing blocks.
57+
// It can happen by reaching the set stopping `stopHeight`, or
5258
// if the node was started in pause mode.
5359
// It is a final state and cannot be changed
5460
StopControlPaused
5561
)
5662

5763
// NewStopControl creates new empty NewStopControl
58-
func NewStopControl(log zerolog.Logger, paused bool, lastExecutedHeight uint64) *StopControl {
64+
func NewStopControl(
65+
log zerolog.Logger,
66+
paused bool,
67+
lastExecutedHeight uint64,
68+
) *StopControl {
5969
state := StopControlOff
6070
if paused {
6171
state = StopControlPaused
@@ -82,61 +92,84 @@ func (s *StopControl) IsPaused() bool {
8292
return s.state == StopControlPaused
8393
}
8494

85-
// SetStopHeight sets new stop height and crash mode, and return old values:
86-
// - height
95+
// SetStopHeight sets new stopHeight and crash mode, and return old values:
96+
// - stopHeight
8797
// - crash
8898
//
8999
// Returns error if the stopping process has already commenced, new values will be rejected.
90-
func (s *StopControl) SetStopHeight(height uint64, crash bool) (uint64, bool, error) {
100+
func (s *StopControl) SetStopHeight(
101+
height uint64,
102+
crash bool,
103+
) (uint64, bool, error) {
91104
s.Lock()
92105
defer s.Unlock()
93106

94-
oldHeight := s.height
107+
oldHeight := s.stopHeight
95108
oldCrash := s.crash
96109

97110
if s.state == StopControlCommenced {
98-
return oldHeight, oldCrash, fmt.Errorf("cannot update stop height, stopping commenced for height %d with crash=%t", oldHeight, oldCrash)
111+
return oldHeight,
112+
oldCrash,
113+
fmt.Errorf(
114+
"cannot update stopHeight, "+
115+
"stopping commenced for stopHeight %d with crash=%t",
116+
oldHeight,
117+
oldCrash,
118+
)
99119
}
100120

101121
if s.state == StopControlPaused {
102-
return oldHeight, oldCrash, fmt.Errorf("cannot update stop height, already paused")
122+
return oldHeight,
123+
oldCrash,
124+
fmt.Errorf("cannot update stopHeight, already paused")
103125
}
104126

105-
// +1 because we track last executing height, so +1 is the lowest possible block to stop
106-
if height <= s.highestExecutingHeight+1 {
107-
return oldHeight, oldCrash, fmt.Errorf("cannot update stop height, given height %d at or below last executed %d", height, s.highestExecutingHeight)
127+
// cannot set stopHeight to block which is already executing
128+
// so the lowest possible stopHeight is highestExecutingHeight+1
129+
if height <= s.highestExecutingHeight {
130+
return oldHeight,
131+
oldCrash,
132+
fmt.Errorf(
133+
"cannot update stopHeight, "+
134+
"given stopHeight %d below or equal to highest executing height %d",
135+
height,
136+
s.highestExecutingHeight,
137+
)
108138
}
109139

110140
s.log.Info().
111-
Int8("previous_state", int8(s.state)).Int8("new_state", int8(StopControlSet)).
112-
Uint64("height", height).Bool("crash", crash).
113-
Uint64("old_height", oldHeight).Bool("old_crash", oldCrash).Msg("new stop height set")
141+
Int8("previous_state", int8(s.state)).
142+
Int8("new_state", int8(StopControlSet)).
143+
Uint64("stopHeight", height).
144+
Bool("crash", crash).
145+
Uint64("old_height", oldHeight).
146+
Bool("old_crash", oldCrash).
147+
Msg("new stopHeight set")
114148

115149
s.state = StopControlSet
116150

117-
s.height = height
151+
s.stopHeight = height
118152
s.crash = crash
119153
s.stopAfterExecuting = flow.ZeroID
120154

121155
return oldHeight, oldCrash, nil
122156
}
123157

124158
// GetStopHeight returns:
125-
// - height
159+
// - stopHeight
126160
// - crash
127161
//
128162
// Values are undefined if they were not previously set
129163
func (s *StopControl) GetStopHeight() (uint64, bool) {
130164
s.RLock()
131165
defer s.RUnlock()
132166

133-
return s.height, s.crash
167+
return s.stopHeight, s.crash
134168
}
135169

136170
// blockProcessable should be called when new block is processable.
137171
// It returns boolean indicating if the block should be processed.
138172
func (s *StopControl) blockProcessable(b *flow.Header) bool {
139-
140173
s.Lock()
141174
defer s.Unlock()
142175

@@ -148,9 +181,19 @@ func (s *StopControl) blockProcessable(b *flow.Header) bool {
148181
return false
149182
}
150183

151-
// skips blocks at or above requested stop height
152-
if b.Height >= s.height {
153-
s.log.Warn().Int8("previous_state", int8(s.state)).Int8("new_state", int8(StopControlCommenced)).Msgf("Skipping execution of %s at height %d because stop has been requested at height %d", b.ID(), b.Height, s.height)
184+
// skips blocks at or above requested stopHeight
185+
if b.Height >= s.stopHeight {
186+
s.log.Warn().
187+
Int8("previous_state", int8(s.state)).
188+
Int8("new_state", int8(StopControlCommenced)).
189+
Msgf(
190+
"Skipping execution of %s at height %d"+
191+
" because stop has been requested at height %d",
192+
b.ID(),
193+
b.Height,
194+
s.stopHeight,
195+
)
196+
154197
s.state = StopControlCommenced // if block was skipped, move into commenced state
155198
return false
156199
}
@@ -159,7 +202,11 @@ func (s *StopControl) blockProcessable(b *flow.Header) bool {
159202
}
160203

161204
// blockFinalized should be called when a block is marked as finalized
162-
func (s *StopControl) blockFinalized(ctx context.Context, execState state.ReadOnlyExecutionState, h *flow.Header) {
205+
func (s *StopControl) blockFinalized(
206+
ctx context.Context,
207+
execState state.ReadOnlyExecutionState,
208+
h *flow.Header,
209+
) {
163210

164211
s.Lock()
165212
defer s.Unlock()
@@ -168,29 +215,38 @@ func (s *StopControl) blockFinalized(ctx context.Context, execState state.ReadOn
168215
return
169216
}
170217

171-
// Once finalization reached stop height we can be sure no other fork will be valid at this height,
218+
// Once finalization reached stopHeight we can be sure no other fork will be valid at this height,
172219
// if this block's parent has been executed, we are safe to stop or crash.
173220
// This will happen during normal execution, where blocks are executed before they are finalized.
174221
// However, it is possible that EN block computation progress can fall behind. In this case,
175-
// we want to crash only after the execution reached the stop height.
176-
if h.Height == s.height {
222+
// we want to crash only after the execution reached the stopHeight.
223+
if h.Height == s.stopHeight {
177224

178225
executed, err := state.IsBlockExecuted(ctx, execState, h.ParentID)
179226
if err != nil {
180227
// any error here would indicate unexpected storage error, so we crash the node
181-
s.log.Fatal().Err(err).Str("block_id", h.ID().String()).Msg("failed to check if the block has been executed")
228+
// TODO: what if the error is due to the node being stopped?
229+
// i.e. context cancelled?
230+
s.log.Fatal().
231+
Err(err).
232+
Str("block_id", h.ID().String()).
233+
Msg("failed to check if the block has been executed")
182234
return
183235
}
184236

185237
if executed {
186238
s.stopExecution()
187239
} else {
188240
s.stopAfterExecuting = h.ParentID
189-
s.log.Info().Msgf("Node scheduled to stop executing after executing block %s at height %d", s.stopAfterExecuting.String(), h.Height-1)
241+
s.log.Info().
242+
Msgf(
243+
"Node scheduled to stop executing"+
244+
" after executing block %s at height %d",
245+
s.stopAfterExecuting.String(),
246+
h.Height-1,
247+
)
190248
}
191-
192249
}
193-
194250
}
195251

196252
// blockExecuted should be called after a block has finished execution
@@ -203,37 +259,61 @@ func (s *StopControl) blockExecuted(h *flow.Header) {
203259
}
204260

205261
if s.stopAfterExecuting == h.ID() {
206-
// double check. Even if requested stop height has been changed multiple times,
262+
// double check. Even if requested stopHeight has been changed multiple times,
207263
// as long as it matches this block we are safe to terminate
208-
209-
if h.Height == s.height-1 {
264+
if h.Height == s.stopHeight-1 {
210265
s.stopExecution()
211266
} else {
212-
s.log.Warn().Msgf("Inconsistent stopping state. Scheduled to stop after executing block ID %s and height %d, but this block has a height %d. ",
213-
h.ID().String(), s.height-1, h.Height)
267+
s.log.Warn().
268+
Msgf(
269+
"Inconsistent stopping state. "+
270+
"Scheduled to stop after executing block ID %s and height %d, "+
271+
"but this block has a height %d. ",
272+
h.ID().String(),
273+
s.stopHeight-1,
274+
h.Height,
275+
)
214276
}
215277
}
216278
}
217279

218280
func (s *StopControl) stopExecution() {
219281
if s.crash {
220-
s.log.Fatal().Msgf("Crashing as finalization reached requested stop height %d and the highest executed block is (%d - 1)", s.height, s.height)
221-
} else {
222-
s.log.Debug().Int8("previous_state", int8(s.state)).Int8("new_state", int8(StopControlPaused)).Msg("StopControl state transition")
223-
s.state = StopControlPaused
224-
s.log.Warn().Msgf("Pausing execution as finalization reached requested stop height %d", s.height)
282+
s.log.Fatal().Msgf(
283+
"Crashing as finalization reached requested "+
284+
"stop height %d and the highest executed block is (%d - 1)",
285+
s.stopHeight,
286+
s.stopHeight,
287+
)
288+
return
225289
}
290+
291+
s.log.Debug().
292+
Int8("previous_state", int8(s.state)).
293+
Int8("new_state", int8(StopControlPaused)).
294+
Msg("StopControl state transition")
295+
296+
s.state = StopControlPaused
297+
298+
s.log.Warn().Msgf(
299+
"Pausing execution as finalization reached "+
300+
"the requested stop height %d",
301+
s.stopHeight,
302+
)
303+
226304
}
227305

228-
// executingBlockHeight should be called while execution of height starts, used for internal tracking of the minimum
229-
// possible value of height
306+
// executingBlockHeight should be called while execution of height starts,
307+
// used for internal tracking of the minimum possible value of stopHeight
230308
func (s *StopControl) executingBlockHeight(height uint64) {
309+
// TODO: should we lock here?
310+
231311
if s.state == StopControlPaused {
232312
return
233313
}
234314

235-
// updating the highest executing height, which will be used to reject setting stop height that
236-
// is too low.
315+
// updating the highest executing height, which will be used to reject setting
316+
// stopHeight that is too low.
237317
if height > s.highestExecutingHeight {
238318
s.highestExecutingHeight = height
239319
}

0 commit comments

Comments
 (0)