Skip to content

Commit 9da9561

Browse files
TerryhungTerry
andauthored
feat: add new option for walk config (#1212)
* Add new option for walk config --------- Co-authored-by: Terry <[email protected]>
1 parent 8814fc4 commit 9da9561

File tree

6 files changed

+48
-20
lines changed

6 files changed

+48
-20
lines changed

chain/walk/walker.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,30 @@ import (
1717

1818
var log = logging.Logger("lily/chain/walk")
1919

20-
func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter) *Walker {
20+
func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter, stopOnError bool) *Walker {
2121
return &Walker{
22-
node: node,
23-
obs: obs,
24-
name: name,
25-
tasks: tasks,
26-
minHeight: minHeight,
27-
maxHeight: maxHeight,
28-
report: r,
22+
node: node,
23+
obs: obs,
24+
name: name,
25+
tasks: tasks,
26+
minHeight: minHeight,
27+
maxHeight: maxHeight,
28+
report: r,
29+
stopOnError: stopOnError,
2930
}
3031
}
3132

3233
// Walker is a job that indexes blocks by walking the chain history.
3334
type Walker struct {
34-
node lens.API
35-
obs indexer.Indexer
36-
name string
37-
tasks []string
38-
minHeight int64 // limit persisting to tipsets equal to or above this height
39-
maxHeight int64 // limit persisting to tipsets equal to or below this height}
40-
done chan struct{}
41-
report *schedule.Reporter
35+
node lens.API
36+
obs indexer.Indexer
37+
name string
38+
tasks []string
39+
minHeight int64 // limit persisting to tipsets equal to or above this height
40+
maxHeight int64 // limit persisting to tipsets equal to or below this height}
41+
done chan struct{}
42+
report *schedule.Reporter
43+
stopOnError bool
4244
}
4345

4446
// Run starts walking the chain history and continues until the context is done or
@@ -92,6 +94,7 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)
9294
defer span.End()
9395

9496
var err error
97+
errs := []error{}
9598
for int64(ts.Height()) >= c.minHeight && ts.Height() != 0 {
9699
select {
97100
case <-ctx.Done():
@@ -102,7 +105,15 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)
102105
c.report.UpdateCurrentHeight(int64(ts.Height()))
103106
if success, err := c.obs.TipSet(ctx, ts, indexer.WithIndexerType(indexer.Walk), indexer.WithTasks(c.tasks)); err != nil {
104107
span.RecordError(err)
105-
return fmt.Errorf("notify tipset: %w", err)
108+
err := fmt.Errorf("index tipset, height: %v, error: %v", ts.Height().String(), err)
109+
log.Errorf("%v", err)
110+
// collect error
111+
errs = append(errs, err)
112+
113+
// return an error only if the "stopOnError" flag is set to true.
114+
if c.stopOnError {
115+
return err
116+
}
106117
} else if !success {
107118
log.Errorw("walk incomplete", "height", ts.Height(), "tipset", ts.Key().String(), "reporter", c.name)
108119
}
@@ -115,5 +126,9 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)
115126
}
116127
}
117128

129+
if len(errs) > 0 {
130+
return fmt.Errorf("errors: %v", errs)
131+
}
132+
118133
return nil
119134
}

chain/walk/walker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestWalker(t *testing.T) {
5656

5757
t.Logf("initializing indexer")
5858
reporter := &schedule.Reporter{}
59-
idx := NewWalker(im, nodeAPI, t.Name(), []string{tasktype.BlocksTask}, 0, int64(head.Height()), reporter)
59+
idx := NewWalker(im, nodeAPI, t.Name(), []string{tasktype.BlocksTask}, 0, int64(head.Height()), reporter, false)
6060

6161
t.Logf("indexing chain")
6262
err = idx.WalkChain(ctx, nodeAPI, head)

commands/job/job.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ var JobRunCmd = &cli.Command{
3535
RunRestartDelayFlag,
3636
RunRestartFailure,
3737
RunRestartCompletion,
38+
StopOnError,
3839
},
3940
Subcommands: []*cli.Command{
4041
WalkCmd,

commands/job/options.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type runOpts struct {
2121

2222
RestartCompletion bool
2323
RestartFailure bool
24+
StopOnError bool
2425
}
2526

2627
func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig {
@@ -35,6 +36,7 @@ func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig {
3536
RestartOnFailure: RunFlags.RestartFailure,
3637
RestartOnCompletion: RunFlags.RestartCompletion,
3738
RestartDelay: RunFlags.RestartDelay,
39+
StopOnError: RunFlags.StopOnError,
3840
}
3941
}
4042

@@ -96,6 +98,14 @@ var RunRestartFailure = &cli.BoolFlag{
9698
Destination: &RunFlags.RestartFailure,
9799
}
98100

101+
var StopOnError = &cli.BoolFlag{
102+
Name: "stop-on-error",
103+
Usage: "Stop the job if it encounters an error.",
104+
EnvVars: []string{"LILY_JOB_STOP_ON_ERROR"},
105+
Value: false,
106+
Destination: &RunFlags.StopOnError,
107+
}
108+
99109
type notifyOps struct {
100110
queue string
101111
}

lens/lily/api.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ type LilyJobConfig struct {
9494
RestartOnFailure bool
9595
// RestartOnCompletion when true will restart the job when it completes.
9696
RestartOnCompletion bool
97+
// RestartOnCompletion when true will restart the job when it completes.
98+
StopOnError bool
9799
// RestartDelay configures how long to wait before restarting the job.
98100
RestartDelay time.Duration
99101
// Storage is the name of the storage system the job will use, may be empty.

lens/lily/impl.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul
328328
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
329329
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
330330
RestartDelay: cfg.JobConfig.RestartDelay,
331-
Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To, reporter),
331+
Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To, reporter, cfg.JobConfig.StopOnError),
332332
Reporter: reporter,
333333
}
334334

@@ -356,7 +356,7 @@ func (m *LilyNodeAPI) LilyWalkNotify(_ context.Context, cfg *LilyWalkNotifyConfi
356356
RestartOnFailure: cfg.WalkConfig.JobConfig.RestartOnFailure,
357357
RestartOnCompletion: cfg.WalkConfig.JobConfig.RestartOnCompletion,
358358
RestartDelay: cfg.WalkConfig.JobConfig.RestartDelay,
359-
Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To, reporter),
359+
Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To, reporter, cfg.WalkConfig.JobConfig.StopOnError),
360360
Reporter: reporter,
361361
}
362362
res := m.Scheduler.Submit(jobConfig)

0 commit comments

Comments
 (0)