Skip to content

Commit 93e902c

Browse files
committed
Pipeline: handle FinishEarly errors at this level
Handle `FinishEarly` errors at the `Pipeline` level rather than at the `Scanner` level. This is preparation for the next change.
1 parent 5677b0d commit 93e902c

File tree

2 files changed

+30
-15
lines changed

2 files changed

+30
-15
lines changed

internal/pipe/pipeline.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package pipe
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"fmt"
78
"io"
89
"io/ioutil"
@@ -17,6 +18,13 @@ type Env struct {
1718
Dir string
1819
}
1920

21+
// FinishEarly is an error that can be returned by a `Stage` to
22+
// request that the iteration be ended early (possibly without reading
23+
// all of its input). This "error" is considered a successful return,
24+
// and is not reported to the caller.
25+
//nolint:revive
26+
var FinishEarly = errors.New("finish stage early")
27+
2028
// Pipeline represents a Unix-like pipe that can include multiple
2129
// stages, including external processes but also and stages written in
2230
// Go.
@@ -186,12 +194,27 @@ func (p *Pipeline) Wait() error {
186194
s := p.stages[i]
187195
err := s.Wait()
188196

189-
// We want to report the error that is most informative. We
190-
// take that to be the error from the earliest pipeline stage
191-
// that failed of a non-pipe error. If that didn't happen,
192-
// take the error from the last pipeline stage that failed due
193-
// to a pipe error.
194-
if err != nil && (earliestStageErr == nil || !IsPipeError(err)) {
197+
// Error handling:
198+
199+
if err == nil {
200+
// No error to handle.
201+
continue
202+
}
203+
204+
if err == FinishEarly {
205+
// We ignore `FinishEarly` errors because that is how a
206+
// stage informs us that it intentionally finished early.
207+
continue
208+
}
209+
210+
// If we reach this point, then the stage exited with a
211+
// non-ignorable error. But multiple stages might report
212+
// errors, and we want to report the one that is most
213+
// informative. We take that to be the error from the earliest
214+
// pipeline stage that failed from a non-pipe error. If that
215+
// didn't happen, take the error from the last pipeline stage
216+
// that failed due to a pipe error.
217+
if earliestStageErr == nil || !IsPipeError(err) {
195218
// Overwrite any existing values here so that we end up
196219
// retaining the last error that we see; i.e., the error
197220
// that happened earliest in the pipeline.

internal/pipe/scanner.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package pipe
33
import (
44
"bufio"
55
"context"
6-
"errors"
76
"io"
87
)
98

@@ -16,12 +15,6 @@ type Scanner interface {
1615
Err() error
1716
}
1817

19-
// FinishEarly is an error that can be returned by a
20-
// `LinewiseStageFunc` to request that the iteration be ended early,
21-
// without an error.
22-
//nolint:revive
23-
var FinishEarly = errors.New("finish stage early")
24-
2518
// NewScannerFunc is used to create a `Scanner` for scanning input
2619
// that is coming from `r`.
2720
type NewScannerFunc func(r io.Reader) (Scanner, error)
@@ -32,7 +25,7 @@ type NewScannerFunc func(r io.Reader) (Scanner, error)
3225
func ScannerFunction(
3326
name string, newScanner NewScannerFunc, f LinewiseStageFunc,
3427
) Stage {
35-
stage := Function(
28+
return Function(
3629
name,
3730
func(ctx context.Context, env Env, stdin io.Reader, stdout io.Writer) (theErr error) {
3831
scanner, err := newScanner(stdin)
@@ -71,5 +64,4 @@ func ScannerFunction(
7164
// `p.AddFunction()` arranges for `stdout` to be closed.
7265
},
7366
)
74-
return IgnoreError(stage, IsError(FinishEarly))
7567
}

0 commit comments

Comments
 (0)