Skip to content

Commit 3b3243a

Browse files
author
Greg Soltis
authored
Existing and error logs behavior (#4656)
### Description - Adds an integration test for existing behavior (`continue.t`) - Fixes behavior of `--output-logs=errors-only` to match behavior for `full` in the case of a task that errors - Fixes `--continue` behavior to report all failed tasks at the end ### Testing Instructions New `continue.t` integration test. I recommend viewing the changes to that file at each commit to see initial behavior and how it changes. Fixes #4504 link WEB-895
1 parent 85a95ea commit 3b3243a

File tree

11 files changed

+208
-32
lines changed

11 files changed

+208
-32
lines changed

cli/internal/core/engine.go

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"sort"
88
"strings"
9+
"sync"
910
"sync/atomic"
1011

1112
"github.com/vercel/turbo/cli/internal/fs"
@@ -72,40 +73,88 @@ type EngineExecutionOptions struct {
7273
Concurrency int
7374
}
7475

76+
// StopExecutionSentinel is used to return an error from a graph Walk that indicates that
77+
// all further walking should stop.
78+
type StopExecutionSentinel struct {
79+
err error
80+
}
81+
82+
// StopExecution wraps the given error in a sentinel error indicating that
83+
// graph traversal should stop. Note that this will stop all tasks, not just
84+
// downstream tasks.
85+
func StopExecution(reason error) *StopExecutionSentinel {
86+
return &StopExecutionSentinel{
87+
err: reason,
88+
}
89+
}
90+
91+
// Error implements error.Error for StopExecutionSentinel
92+
func (se *StopExecutionSentinel) Error() string {
93+
return fmt.Sprintf("Execution stopped due to error: %v", se.err)
94+
}
95+
7596
// Execute executes the pipeline, constructing an internal task graph and walking it accordingly.
7697
func (e *Engine) Execute(visitor Visitor, opts EngineExecutionOptions) []error {
7798
var sema = util.NewSemaphore(opts.Concurrency)
7899
var errored int32
79-
return e.TaskGraph.Walk(func(v dag.Vertex) error {
80-
// If something has already errored, short-circuit.
81-
// There is a race here between concurrent tasks. However, if there is not a
82-
// dependency edge between them, we are not required to have a strict order
83-
// between them, so a failed task can fail to short-circuit a concurrent
84-
// task that happened to be starting at the same time.
85-
if atomic.LoadInt32(&errored) != 0 {
86-
return nil
87-
}
88-
// Each vertex in the graph is a taskID (package#task format)
89-
taskID := dag.VertexName(v)
90100

91-
// Always return if it is the root node
92-
if strings.Contains(taskID, ROOT_NODE_NAME) {
93-
return nil
94-
}
101+
// The dag library's behavior is that returning an error from the Walk callback cancels downstream
102+
// tasks, but not unrelated tasks.
103+
// The behavior we want is to either cancel everything or nothing (--continue). So, we do our own
104+
// error handling. Collect any errors that occur in "errors", and report them as the result of
105+
// Execute. panic on any other error returned by Walk.
106+
var errorMu sync.Mutex
107+
var errors []error
108+
recordErr := func(err error) {
109+
errorMu.Lock()
110+
defer errorMu.Unlock()
111+
errors = append(errors, err)
112+
}
113+
unusedErrs := e.TaskGraph.Walk(func(v dag.Vertex) error {
114+
// Use an extra func() to ensure that we are not returning any errors to Walk
115+
func() {
116+
// If something has already errored, short-circuit.
117+
// There is a race here between concurrent tasks. However, if there is not a
118+
// dependency edge between them, we are not required to have a strict order
119+
// between them, so a failed task can fail to short-circuit a concurrent
120+
// task that happened to be starting at the same time.
121+
if atomic.LoadInt32(&errored) != 0 {
122+
return
123+
}
124+
// Each vertex in the graph is a taskID (package#task format)
125+
taskID := dag.VertexName(v)
95126

96-
// Acquire the semaphore unless parallel
97-
if !opts.Parallel {
98-
sema.Acquire()
99-
defer sema.Release()
100-
}
127+
// Always return if it is the root node
128+
if strings.Contains(taskID, ROOT_NODE_NAME) {
129+
return
130+
}
101131

102-
if err := visitor(taskID); err != nil {
103-
// We only ever flip from false to true, so we don't need to compare and swap the atomic
104-
atomic.StoreInt32(&errored, 1)
105-
return err
106-
}
132+
// Acquire the semaphore unless parallel
133+
if !opts.Parallel {
134+
sema.Acquire()
135+
defer sema.Release()
136+
}
137+
138+
if err := visitor(taskID); err != nil {
139+
if se, ok := err.(*StopExecutionSentinel); ok {
140+
// We only ever flip from false to true, so we don't need to compare and swap the atomic
141+
atomic.StoreInt32(&errored, 1)
142+
recordErr(se.err)
143+
// Note: returning an error here would cancel execution of downstream tasks only, and show
144+
// up in the errors returned from Walk. However, we are doing our own error collection
145+
// and intentionally ignoring errors from walk, so fallthrough and use the "errored" mechanism
146+
// to skip downstream tasks
147+
} else {
148+
recordErr(err)
149+
}
150+
}
151+
}()
107152
return nil
108153
})
154+
if len(unusedErrs) > 0 {
155+
panic("we should be handling execution errors via our own errors + errored mechanism")
156+
}
157+
return errors
109158
}
110159

111160
// MissingTaskError is a specialized Error thrown in the case that we can't find a task.

cli/internal/core/engine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func TestShortCircuiting(t *testing.T) {
7171
println(taskID)
7272
executed[taskID] = true
7373
if taskID == "b#build" {
74-
return expectedErr
74+
return StopExecution(expectedErr)
7575
}
7676
return nil
7777
}

cli/internal/run/real_run.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas
322322

323323
ec.logError(prettyPrefix, err)
324324
if !ec.rs.Opts.runOpts.ContinueOnError {
325-
return nil, errors.Wrapf(err, "failed to capture outputs for \"%v\"", packageTask.TaskID)
325+
return nil, core.StopExecution(errors.Wrapf(err, "failed to capture outputs for \"%v\"", packageTask.TaskID))
326326
}
327327
}
328328

@@ -381,19 +381,18 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas
381381
tracer(runsummary.TargetBuildFailed, err, nil)
382382
}
383383

384+
// If there was an error, flush the buffered output
385+
taskCache.OnError(prefixedUI, progressLogger)
384386
progressLogger.Error(fmt.Sprintf("Error: command finished with error: %v", err))
385387
if !ec.rs.Opts.runOpts.ContinueOnError {
386388
prefixedUI.Error(fmt.Sprintf("ERROR: command finished with error: %s", err))
387389
ec.processes.Close()
390+
// We're not continuing, stop graph traversal
391+
err = core.StopExecution(err)
388392
} else {
389393
prefixedUI.Warn("command finished with error, but continuing...")
390-
// Set to nil so we don't short-circuit any other execution
391-
err = nil
392394
}
393395

394-
// If there was an error, flush the buffered output
395-
taskCache.OnError(prefixedUI, progressLogger)
396-
397396
return taskExecutionSummary, err
398397
}
399398

cli/internal/runcache/runcache.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ func (tc TaskCache) ReplayLogFile(prefixedUI *cli.PrefixedUi, progressLogger hcl
190190
// This is called if the task exited with an non-zero error code.
191191
func (tc TaskCache) OnError(terminal *cli.PrefixedUi, logger hclog.Logger) {
192192
if tc.taskOutputMode == util.ErrorTaskOutput {
193+
terminal.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(tc.hash)))
193194
tc.ReplayLogFile(terminal, logger)
194195
}
195196
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
node_modules/
2+
.turbo
3+
.npmrc
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"name": "my-app",
3+
"scripts": {
4+
"build": "echo 'working'"
5+
},
6+
"dependencies": {
7+
"some-lib": "*"
8+
}
9+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"name": "other-app",
3+
"scripts": {
4+
"build": "exit 3"
5+
},
6+
"dependencies": {
7+
"some-lib": "*"
8+
}
9+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"name": "some-lib",
3+
"scripts": {
4+
"build": "exit 2"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"name": "monorepo",
3+
"workspaces": [
4+
"apps/**"
5+
]
6+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"$schema": "https://turbo.build/schema.json",
3+
"pipeline": {
4+
"build": {
5+
"dependsOn": ["^build"],
6+
"outputs": []
7+
}
8+
}
9+
}

0 commit comments

Comments
 (0)