Skip to content

Commit 68e73f7

Browse files
committed
Pipeline: suppress pipe errors in some circumstances
Consider a pipeline p := pipe.New() p.Add(s1, s2, s3, s4) Suppose that stage `s3` sometimes intentionally exits early without reading all of its `stdin`. When this happens, it is common for the previous stage `s2` to fail with a pipe error. This is because `s2` often continues trying to write to its stdout, but the other end of its stdout is `s3`'s stdin, which got closed when `s3` exited. The result is that `s2` fails with `SIGPIPE`, `syscall.EPIPE`, or `io.ErrClosedPipe`. But if it is expected that `s3` exits early, then a pipe error from `s2` is uninteresting. The current way of dealing with this situation is to explicitly wrap `s2` with something like `pipe.IgnoreError(s2, IsPipeError)`, which causes a pipe error from that stage to be ignored. But often that's not the end of the story. If `s2` exits due to a pipe error, then it is often the case that `s1` will _also_ exit due to a pipe error trying to write to _its_ stdout. So to handle this situation correctly, the pipe would have to be created like p := pipe.New() p.Add( pipe.IgnoreError(s1, IsPipeError), pipe.IgnoreError(s2, IsPipeError), s3, s4, ) This is verbose, and experience shows that it is easy to forget. Let's make this simpler. If `s3` intentionally exits without reading all of its input, it should return the special `FinishEarly` error, thereby telling the enclosing pipeline that its early exit was intentional. In this case, change `Pipeline` to ignore any pipe error from the preceding stage, `s2`. Moreover, if `s2` exits with a pipe error, ignore any pipe error from `s1`, and so on. However, if a stage exits without an error, then again consider a pipe error from the previous stage to be interesting. There is some chance that this could suppress actual, interesting errors. For example, `s2` might exit due to a `SIGPIPE` that it got when trying to write to a file descriptor other than stdout, for example when writing to a subprocess or to a socket, and not handle that `EPIPE` internally. But I think that these situations will be vanishingly rare compared to the more common case described above, which has caused real-life irritation. Also add tests of the new behavior.
1 parent 93e902c commit 68e73f7

File tree

2 files changed

+247
-25
lines changed

2 files changed

+247
-25
lines changed

internal/pipe/pipeline.go

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -190,35 +190,69 @@ func (p *Pipeline) Wait() error {
190190
var earliestStageErr error
191191
var earliestFailedStage Stage
192192

193+
finishedEarly := false
193194
for i := len(p.stages) - 1; i >= 0; i-- {
194195
s := p.stages[i]
195196
err := s.Wait()
196197

197-
// Error handling:
198-
199-
if err == nil {
200-
// No error to handle.
198+
// Handle errors:
199+
switch {
200+
case err == nil:
201+
// No error to handle. But unset the `finishedEarly` flag,
202+
// because earlier stages shouldn't be affected by the
203+
// later stage that finished early.
204+
finishedEarly = false
201205
continue
202-
}
203206

204-
if err == FinishEarly {
207+
case err == FinishEarly:
205208
// We ignore `FinishEarly` errors because that is how a
206209
// stage informs us that it intentionally finished early.
210+
// Moreover, if we see a `FinishEarly` error, ignore any
211+
// pipe error from the immediately preceding stage,
212+
// because it probably came from trying to write to this
213+
// stage after this stage closed its stdin.
214+
finishedEarly = true
207215
continue
208-
}
209216

210-
// If we reach this point, then the stage exited with a
211-
// non-ignorable error. But multiple stages might report
212-
// errors, and we want to report the one that is most
213-
// informative. We take that to be the error from the earliest
214-
// pipeline stage that failed from a non-pipe error. If that
215-
// didn't happen, take the error from the last pipeline stage
216-
// that failed due to a pipe error.
217-
if earliestStageErr == nil || !IsPipeError(err) {
218-
// Overwrite any existing values here so that we end up
219-
// retaining the last error that we see; i.e., the error
220-
// that happened earliest in the pipeline.
217+
case IsPipeError(err):
218+
switch {
219+
case finishedEarly:
220+
// A successor stage finished early. It is common for
221+
// this to cause earlier stages to fail with pipe
222+
// errors. Such errors are uninteresting, so ignore
223+
// them. Leave the `finishedEarly` flag set, because
224+
// the preceding stage might get a pipe error from
225+
// trying to write to this one.
226+
case earliestStageErr != nil:
227+
// A later stage has already reported an error. This
228+
// means that we don't want to report the error from
229+
// this stage:
230+
//
231+
// * If the later error was also a pipe error: we want
232+
// to report the _last_ pipe error seen, which would
233+
// be the one already recorded.
234+
//
235+
// * If the later error was not a pipe error: non-pipe
236+
// errors are always considered more important than
237+
// pipe errors, so again we would want to keep the
238+
// error that is already recorded.
239+
default:
240+
// In this case, the pipe error from this stage is the
241+
// most important error that we have seen so far, so
242+
// remember it:
243+
earliestFailedStage, earliestStageErr = s, err
244+
}
245+
246+
default:
247+
// This stage exited with a non-pipe error. If multiple
248+
// stages exited with such errors, we want to report the
249+
// one that is most informative. We take that to be the
250+
// error from the earliest failing stage. Since we are
251+
// iterating through stages in reverse order, overwrite
252+
// any existing remembered errors (which would have come
253+
// from a later stage):
221254
earliestFailedStage, earliestStageErr = s, err
255+
finishedEarly = false
222256
}
223257
}
224258

internal/pipe/pipeline_test.go

Lines changed: 195 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -514,14 +514,12 @@ func TestScannerFinishEarly(t *testing.T) {
514514
var length int64
515515

516516
p := pipe.New()
517-
// Print the numbers from 1 to 20 (generated from scratch):
518517
p.Add(
519-
pipe.IgnoreError(
520-
seqFunction(20),
521-
pipe.IsPipeError,
522-
),
523-
// Pass the numbers through up to 7, then exit with an
524-
// ignored error:
518+
// Print the numbers from 1 to 20 (generated from scratch):
519+
seqFunction(20),
520+
521+
// Pass the numbers through up to 7, then exit with an ignored
522+
// error:
525523
pipe.LinewiseFunction(
526524
"finish-after-7",
527525
func(_ context.Context, _ pipe.Env, line []byte, w *bufio.Writer) error {
@@ -532,6 +530,7 @@ func TestScannerFinishEarly(t *testing.T) {
532530
return nil
533531
},
534532
),
533+
535534
// Read the numbers and add them into the sum:
536535
pipe.Function(
537536
"compute-length",
@@ -574,6 +573,189 @@ func TestPrintf(t *testing.T) {
574573
}
575574
}
576575

576+
func TestErrors(t *testing.T) {
577+
t.Parallel()
578+
ctx := context.Background()
579+
580+
err1 := errors.New("error1")
581+
err2 := errors.New("error2")
582+
583+
for _, tc := range []struct {
584+
name string
585+
stages []pipe.Stage
586+
expectedErr error
587+
}{
588+
{
589+
name: "no-error",
590+
stages: []pipe.Stage{
591+
pipe.Function("noop1", genErr(nil)),
592+
pipe.Function("noop2", genErr(nil)),
593+
pipe.Function("noop3", genErr(nil)),
594+
},
595+
expectedErr: nil,
596+
},
597+
{
598+
name: "lonely-error",
599+
stages: []pipe.Stage{
600+
pipe.Function("err1", genErr(err1)),
601+
},
602+
expectedErr: err1,
603+
},
604+
{
605+
name: "error",
606+
stages: []pipe.Stage{
607+
pipe.Function("noop1", genErr(nil)),
608+
pipe.Function("err1", genErr(err1)),
609+
pipe.Function("noop2", genErr(nil)),
610+
},
611+
expectedErr: err1,
612+
},
613+
{
614+
name: "two-consecutive-errors",
615+
stages: []pipe.Stage{
616+
pipe.Function("noop1", genErr(nil)),
617+
pipe.Function("err1", genErr(err1)),
618+
pipe.Function("err2", genErr(err2)),
619+
pipe.Function("noop2", genErr(nil)),
620+
},
621+
expectedErr: err1,
622+
},
623+
{
624+
name: "pipe-then-error",
625+
stages: []pipe.Stage{
626+
pipe.Function("noop1", genErr(nil)),
627+
pipe.Function("pipe-error", genErr(io.ErrClosedPipe)),
628+
pipe.Function("err1", genErr(err1)),
629+
pipe.Function("noop2", genErr(nil)),
630+
},
631+
expectedErr: err1,
632+
},
633+
{
634+
name: "error-then-pipe",
635+
stages: []pipe.Stage{
636+
pipe.Function("noop1", genErr(nil)),
637+
pipe.Function("err1", genErr(err1)),
638+
pipe.Function("pipe-error", genErr(io.ErrClosedPipe)),
639+
pipe.Function("noop2", genErr(nil)),
640+
},
641+
expectedErr: err1,
642+
},
643+
{
644+
name: "two-spaced-errors",
645+
stages: []pipe.Stage{
646+
pipe.Function("noop1", genErr(nil)),
647+
pipe.Function("err1", genErr(err1)),
648+
pipe.Function("noop2", genErr(nil)),
649+
pipe.Function("err2", genErr(err2)),
650+
pipe.Function("noop3", genErr(nil)),
651+
},
652+
expectedErr: err1,
653+
},
654+
{
655+
name: "finish-early-ignored",
656+
stages: []pipe.Stage{
657+
pipe.Function("noop1", genErr(nil)),
658+
pipe.Function("finish-early1", genErr(pipe.FinishEarly)),
659+
pipe.Function("noop2", genErr(nil)),
660+
pipe.Function("finish-early2", genErr(pipe.FinishEarly)),
661+
pipe.Function("noop3", genErr(nil)),
662+
},
663+
expectedErr: nil,
664+
},
665+
{
666+
name: "error-before-finish-early",
667+
stages: []pipe.Stage{
668+
pipe.Function("err1", genErr(err1)),
669+
pipe.Function("finish-early", genErr(pipe.FinishEarly)),
670+
},
671+
expectedErr: err1,
672+
},
673+
{
674+
name: "error-after-finish-early",
675+
stages: []pipe.Stage{
676+
pipe.Function("finish-early", genErr(pipe.FinishEarly)),
677+
pipe.Function("err1", genErr(err1)),
678+
},
679+
expectedErr: err1,
680+
},
681+
{
682+
name: "pipe-then-finish-early",
683+
stages: []pipe.Stage{
684+
pipe.Function("pipe-error", genErr(io.ErrClosedPipe)),
685+
pipe.Function("finish-early", genErr(pipe.FinishEarly)),
686+
},
687+
expectedErr: nil,
688+
},
689+
{
690+
name: "pipe-then-two-finish-early",
691+
stages: []pipe.Stage{
692+
pipe.Function("pipe-error", genErr(io.ErrClosedPipe)),
693+
pipe.Function("finish-early1", genErr(pipe.FinishEarly)),
694+
pipe.Function("finish-early2", genErr(pipe.FinishEarly)),
695+
},
696+
expectedErr: nil,
697+
},
698+
{
699+
name: "two-pipe-then-finish-early",
700+
stages: []pipe.Stage{
701+
pipe.Function("pipe-error1", genErr(io.ErrClosedPipe)),
702+
pipe.Function("pipe-error2", genErr(io.ErrClosedPipe)),
703+
pipe.Function("finish-early", genErr(pipe.FinishEarly)),
704+
},
705+
expectedErr: nil,
706+
},
707+
{
708+
name: "pipe-then-finish-early-with-gap",
709+
stages: []pipe.Stage{
710+
pipe.Function("pipe-error", genErr(io.ErrClosedPipe)),
711+
pipe.Function("noop", genErr(nil)),
712+
pipe.Function("finish-early1", genErr(pipe.FinishEarly)),
713+
},
714+
expectedErr: io.ErrClosedPipe,
715+
},
716+
{
717+
name: "finish-early-then-pipe",
718+
stages: []pipe.Stage{
719+
pipe.Function("finish-early", genErr(pipe.FinishEarly)),
720+
pipe.Function("pipe-error", genErr(io.ErrClosedPipe)),
721+
},
722+
expectedErr: io.ErrClosedPipe,
723+
},
724+
{
725+
name: "error-then-pipe-then-finish-early",
726+
stages: []pipe.Stage{
727+
pipe.Function("err1", genErr(err1)),
728+
pipe.Function("pipe-error", genErr(io.ErrClosedPipe)),
729+
pipe.Function("finish-early", genErr(pipe.FinishEarly)),
730+
},
731+
expectedErr: err1,
732+
},
733+
{
734+
name: "pipe-then-error-then-finish-early",
735+
stages: []pipe.Stage{
736+
pipe.Function("pipe-error", genErr(io.ErrClosedPipe)),
737+
pipe.Function("err1", genErr(err1)),
738+
pipe.Function("finish-early", genErr(pipe.FinishEarly)),
739+
},
740+
expectedErr: err1,
741+
},
742+
} {
743+
tc := tc
744+
t.Run(tc.name, func(t *testing.T) {
745+
t.Parallel()
746+
747+
p := pipe.New()
748+
p.Add(tc.stages...)
749+
err := p.Run(ctx)
750+
if tc.expectedErr == nil {
751+
assert.NoError(t, err)
752+
} else {
753+
assert.ErrorIs(t, err, tc.expectedErr)
754+
}
755+
})
756+
}
757+
}
758+
577759
func BenchmarkSingleProgram(b *testing.B) {
578760
ctx := context.Background()
579761

@@ -662,3 +844,9 @@ func catFn(_ context.Context, _ pipe.Env, stdin io.Reader, stdout io.Writer) err
662844
_, err := io.Copy(stdout, stdin)
663845
return err
664846
}
847+
848+
func genErr(err error) pipe.StageFunc {
849+
return func(_ context.Context, _ pipe.Env, _ io.Reader, _ io.Writer) error {
850+
return err
851+
}
852+
}

0 commit comments

Comments
 (0)