Skip to content

Commit 1458ae5

Browse files
committed
internal/pipe: new package for handling command pipelines
1 parent 6aa6890 commit 1458ae5

File tree

12 files changed

+1623
-10
lines changed

12 files changed

+1623
-10
lines changed

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/cli/safeexec v1.0.0
77
github.com/davecgh/go-spew v1.1.1 // indirect
88
github.com/spf13/pflag v1.0.5
9-
github.com/stretchr/testify v1.4.0
10-
gopkg.in/yaml.v2 v2.2.7 // indirect
9+
github.com/stretchr/testify v1.7.0
10+
go.uber.org/goleak v1.1.12
11+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
1112
)

go.sum

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,53 @@
11
github.com/cli/safeexec v1.0.0 h1:0VngyaIyqACHdcMNWfo6+KdUYnqEr2Sg+bSP1pdF+dI=
22
github.com/cli/safeexec v1.0.0/go.mod h1:Z/D4tTN8Vs5gXYHDCbaM1S/anmEDnJb1iW0+EJ5zx3Q=
3-
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
43
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
54
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
65
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
6+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
7+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
8+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
9+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
10+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
711
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
812
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
913
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
1014
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
1115
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
12-
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
13-
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
14-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
16+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
17+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
18+
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
19+
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
20+
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
21+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
22+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
23+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
24+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
25+
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
26+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
27+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
28+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
29+
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
30+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
31+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
32+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
33+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
34+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
35+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
36+
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
37+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
38+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
39+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
40+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
41+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
42+
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
43+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
44+
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
45+
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
46+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
47+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
48+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
1549
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
16-
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
17-
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
18-
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
19-
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
50+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
51+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
52+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
53+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/pipe/command.go

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
package pipe
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
"io"
8+
"os"
9+
"os/exec"
10+
"sync/atomic"
11+
"syscall"
12+
"time"
13+
14+
"golang.org/x/sync/errgroup"
15+
)
16+
17+
// commandStage is a pipeline `Stage` based on running an external
18+
// command and piping the data through its stdin and stdout.
19+
type commandStage struct {
20+
name string
21+
stdin io.Closer
22+
cmd *exec.Cmd
23+
done chan struct{}
24+
wg errgroup.Group
25+
stderr bytes.Buffer
26+
27+
// If the context expired and we attempted to kill the command,
28+
// `ctx.Err()` is stored here.
29+
ctxErr atomic.Value
30+
}
31+
32+
// Command returns a pipeline `Stage` based on the specified external
33+
// `command`, run with the given command-line `args`. Its stdin and
34+
// stdout are handled as usual, and its stderr is collected and
35+
// included in any `*exec.ExitError` that the command might emit.
36+
func Command(command string, args ...string) Stage {
37+
if len(command) == 0 {
38+
panic("attempt to create command with empty command")
39+
}
40+
41+
cmd := exec.Command(command, args...)
42+
return CommandStage(command, cmd)
43+
}
44+
45+
// Command returns a pipeline `Stage` with the name `name`, based on
46+
// the specified `cmd`. Its stdin and stdout are handled as usual, and
47+
// its stderr is collected and included in any `*exec.ExitError` that
48+
// the command might emit.
49+
func CommandStage(name string, cmd *exec.Cmd) Stage {
50+
return &commandStage{
51+
name: name,
52+
cmd: cmd,
53+
done: make(chan struct{}),
54+
}
55+
}
56+
57+
func (s *commandStage) Name() string {
58+
return s.name
59+
}
60+
61+
func (s *commandStage) Start(
62+
ctx context.Context, env Env, stdin io.ReadCloser,
63+
) (io.ReadCloser, error) {
64+
if s.cmd.Dir == "" {
65+
s.cmd.Dir = env.Dir
66+
}
67+
68+
if stdin != nil {
69+
s.cmd.Stdin = stdin
70+
// Also keep a copy so that we can close it when the command exits:
71+
s.stdin = stdin
72+
}
73+
74+
stdout, err := s.cmd.StdoutPipe()
75+
if err != nil {
76+
return nil, err
77+
}
78+
79+
// If the caller hasn't arranged otherwise, read the command's
80+
// standard error into our `stderr` field:
81+
if s.cmd.Stderr == nil {
82+
// We can't just set `s.cmd.Stderr = &s.stderr`, because if we
83+
// do then `s.cmd.Wait()` doesn't wait to be sure that all
84+
// error output has been captured. By doing this ourselves, we
85+
// can be sure.
86+
p, err := s.cmd.StderrPipe()
87+
if err != nil {
88+
return nil, err
89+
}
90+
s.wg.Go(func() error {
91+
_, err := io.Copy(&s.stderr, p)
92+
// We don't consider `ErrClosed` an error (FIXME: is this
93+
// correct?):
94+
if err != nil && !errors.Is(err, os.ErrClosed) {
95+
return err
96+
}
97+
return nil
98+
})
99+
}
100+
101+
// Put the command in its own process group:
102+
if s.cmd.SysProcAttr == nil {
103+
s.cmd.SysProcAttr = &syscall.SysProcAttr{}
104+
}
105+
s.cmd.SysProcAttr.Setpgid = true
106+
107+
if err := s.cmd.Start(); err != nil {
108+
return nil, err
109+
}
110+
111+
// Arrange for the process to be killed (gently) if the context
112+
// expires before the command exits normally:
113+
go func() {
114+
select {
115+
case <-ctx.Done():
116+
s.kill(ctx.Err())
117+
case <-s.done:
118+
// Process already done; no need to kill anything.
119+
}
120+
}()
121+
122+
return stdout, nil
123+
}
124+
125+
// kill is called to kill the process if the context expires. `err` is
126+
// the corresponding value of `Context.Err()`.
127+
func (s *commandStage) kill(err error) {
128+
// I believe that the calls to `syscall.Kill()` in this method are
129+
// racy. It could be that s.cmd.Wait() succeeds immediately before
130+
// this call, in which case the process group wouldn't exist
131+
// anymore. But I don't see any way to avoid this without
132+
// duplicating a lot of code from `exec.Cmd`. (`os.Cmd.Kill()` and
133+
// `os.Cmd.Signal()` appear to be race-free, but only because they
134+
// use internal synchronization. But those methods only kill the
135+
// process, not the process group, so they are not suitable here.
136+
137+
// We started the process with PGID == PID:
138+
pid := s.cmd.Process.Pid
139+
select {
140+
case <-s.done:
141+
// Process has ended; no need to kill it again.
142+
return
143+
default:
144+
}
145+
146+
// Record the `ctx.Err()`, which will be used as the error result
147+
// for this stage.
148+
s.ctxErr.Store(err)
149+
150+
// First try to kill using a relatively gentle signal so that
151+
// the processes have a chance to clean up after themselves:
152+
_ = syscall.Kill(-pid, syscall.SIGTERM)
153+
154+
// Well-behaved processes should commit suicide after the above,
155+
// but if they don't exit within 2s, murder the whole lot of them:
156+
go func() {
157+
// Use an explicit `time.Timer` rather than `time.After()` so
158+
// that we can stop it (freeing resources) promptly if the
159+
// command exits before the timer triggers.
160+
timer := time.NewTimer(2 * time.Second)
161+
defer timer.Stop()
162+
163+
select {
164+
case <-s.done:
165+
// Process has ended; no need to kill it again.
166+
case <-timer.C:
167+
_ = syscall.Kill(-pid, syscall.SIGKILL)
168+
}
169+
}()
170+
}
171+
172+
// filterCmdError interprets `err`, which was returned by `Cmd.Wait()`
173+
// (possibly `nil`), possibly modifying it or ignoring it. It returns
174+
// the error that should actually be returned to the caller (possibly
175+
// `nil`).
176+
func (s *commandStage) filterCmdError(err error) error {
177+
if err == nil {
178+
return nil
179+
}
180+
181+
eErr, ok := err.(*exec.ExitError)
182+
if !ok {
183+
return err
184+
}
185+
186+
ctxErr, ok := s.ctxErr.Load().(error)
187+
if ok {
188+
// If the process looks like it was killed by us, substitute
189+
// `ctxErr` for the process's own exit error.
190+
ps, ok := eErr.ProcessState.Sys().(syscall.WaitStatus)
191+
if ok && ps.Signaled() &&
192+
(ps.Signal() == syscall.SIGTERM || ps.Signal() == syscall.SIGKILL) {
193+
return ctxErr
194+
}
195+
}
196+
197+
eErr.Stderr = s.stderr.Bytes()
198+
return eErr
199+
}
200+
201+
func (s *commandStage) Wait() error {
202+
defer close(s.done)
203+
204+
// Make sure that any stderr is copied before `s.cmd.Wait()`
205+
// closes the read end of the pipe:
206+
wErr := s.wg.Wait()
207+
208+
err := s.cmd.Wait()
209+
err = s.filterCmdError(err)
210+
211+
if err == nil && wErr != nil {
212+
err = wErr
213+
}
214+
215+
if s.stdin != nil {
216+
cErr := s.stdin.Close()
217+
if cErr != nil && err == nil {
218+
return cErr
219+
}
220+
}
221+
222+
return err
223+
}

0 commit comments

Comments
 (0)