Skip to content

Commit 5370ec2

Browse files
authored
Merge pull request #90 from github/add-pipe-package
Add an internal `pipe` package for managing process pipelines
2 parents 6641fab + c991c8e commit 5370ec2

File tree

14 files changed

+1941
-10
lines changed

14 files changed

+1941
-10
lines changed

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/cli/safeexec v1.0.0
77
github.com/davecgh/go-spew v1.1.1 // indirect
88
github.com/spf13/pflag v1.0.5
9-
github.com/stretchr/testify v1.4.0
10-
gopkg.in/yaml.v2 v2.2.7 // indirect
9+
github.com/stretchr/testify v1.7.0
10+
go.uber.org/goleak v1.1.12
11+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
1112
)

go.sum

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,53 @@
11
github.com/cli/safeexec v1.0.0 h1:0VngyaIyqACHdcMNWfo6+KdUYnqEr2Sg+bSP1pdF+dI=
22
github.com/cli/safeexec v1.0.0/go.mod h1:Z/D4tTN8Vs5gXYHDCbaM1S/anmEDnJb1iW0+EJ5zx3Q=
3-
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
43
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
54
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
65
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
6+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
7+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
8+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
9+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
10+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
711
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
812
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
913
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
1014
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
1115
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
12-
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
13-
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
14-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
16+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
17+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
18+
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
19+
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
20+
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
21+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
22+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
23+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
24+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
25+
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
26+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
27+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
28+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
29+
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
30+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
31+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
32+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
33+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
34+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
35+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
36+
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
37+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
38+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
39+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
40+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
41+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
42+
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
43+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
44+
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
45+
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
46+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
47+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
48+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
1549
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
16-
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
17-
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
18-
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
19-
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
50+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
51+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
52+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
53+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/pipe/command.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package pipe
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
"io"
8+
"os"
9+
"os/exec"
10+
"sync/atomic"
11+
"syscall"
12+
13+
"golang.org/x/sync/errgroup"
14+
)
15+
16+
// commandStage is a pipeline `Stage` based on running an external
17+
// command and piping the data through its stdin and stdout.
18+
type commandStage struct {
19+
name string
20+
stdin io.Closer
21+
cmd *exec.Cmd
22+
done chan struct{}
23+
wg errgroup.Group
24+
stderr bytes.Buffer
25+
26+
// If the context expired and we attempted to kill the command,
27+
// `ctx.Err()` is stored here.
28+
ctxErr atomic.Value
29+
}
30+
31+
// Command returns a pipeline `Stage` based on the specified external
32+
// `command`, run with the given command-line `args`. Its stdin and
33+
// stdout are handled as usual, and its stderr is collected and
34+
// included in any `*exec.ExitError` that the command might emit.
35+
func Command(command string, args ...string) Stage {
36+
if len(command) == 0 {
37+
panic("attempt to create command with empty command")
38+
}
39+
40+
cmd := exec.Command(command, args...)
41+
return CommandStage(command, cmd)
42+
}
43+
44+
// Command returns a pipeline `Stage` with the name `name`, based on
45+
// the specified `cmd`. Its stdin and stdout are handled as usual, and
46+
// its stderr is collected and included in any `*exec.ExitError` that
47+
// the command might emit.
48+
func CommandStage(name string, cmd *exec.Cmd) Stage {
49+
return &commandStage{
50+
name: name,
51+
cmd: cmd,
52+
done: make(chan struct{}),
53+
}
54+
}
55+
56+
func (s *commandStage) Name() string {
57+
return s.name
58+
}
59+
60+
func (s *commandStage) Start(
61+
ctx context.Context, env Env, stdin io.ReadCloser,
62+
) (io.ReadCloser, error) {
63+
if s.cmd.Dir == "" {
64+
s.cmd.Dir = env.Dir
65+
}
66+
67+
if stdin != nil {
68+
s.cmd.Stdin = stdin
69+
// Also keep a copy so that we can close it when the command exits:
70+
s.stdin = stdin
71+
}
72+
73+
stdout, err := s.cmd.StdoutPipe()
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
// If the caller hasn't arranged otherwise, read the command's
79+
// standard error into our `stderr` field:
80+
if s.cmd.Stderr == nil {
81+
// We can't just set `s.cmd.Stderr = &s.stderr`, because if we
82+
// do then `s.cmd.Wait()` doesn't wait to be sure that all
83+
// error output has been captured. By doing this ourselves, we
84+
// can be sure.
85+
p, err := s.cmd.StderrPipe()
86+
if err != nil {
87+
return nil, err
88+
}
89+
s.wg.Go(func() error {
90+
_, err := io.Copy(&s.stderr, p)
91+
// We don't consider `ErrClosed` an error (FIXME: is this
92+
// correct?):
93+
if err != nil && !errors.Is(err, os.ErrClosed) {
94+
return err
95+
}
96+
return nil
97+
})
98+
}
99+
100+
// Put the command in its own process group, if possible:
101+
s.runInOwnProcessGroup()
102+
103+
if err := s.cmd.Start(); err != nil {
104+
return nil, err
105+
}
106+
107+
// Arrange for the process to be killed (gently) if the context
108+
// expires before the command exits normally:
109+
go func() {
110+
select {
111+
case <-ctx.Done():
112+
s.kill(ctx.Err())
113+
case <-s.done:
114+
// Process already done; no need to kill anything.
115+
}
116+
}()
117+
118+
return stdout, nil
119+
}
120+
121+
// filterCmdError interprets `err`, which was returned by `Cmd.Wait()`
122+
// (possibly `nil`), possibly modifying it or ignoring it. It returns
123+
// the error that should actually be returned to the caller (possibly
124+
// `nil`).
125+
func (s *commandStage) filterCmdError(err error) error {
126+
if err == nil {
127+
return nil
128+
}
129+
130+
eErr, ok := err.(*exec.ExitError)
131+
if !ok {
132+
return err
133+
}
134+
135+
ctxErr, ok := s.ctxErr.Load().(error)
136+
if ok {
137+
// If the process looks like it was killed by us, substitute
138+
// `ctxErr` for the process's own exit error. Note that this
139+
// doesn't do anything on Windows, where the `Signaled()`
140+
// method isn't implemented (it is hardcoded to return
141+
// `false`).
142+
ps, ok := eErr.ProcessState.Sys().(syscall.WaitStatus)
143+
if ok && ps.Signaled() &&
144+
(ps.Signal() == syscall.SIGTERM || ps.Signal() == syscall.SIGKILL) {
145+
return ctxErr
146+
}
147+
}
148+
149+
eErr.Stderr = s.stderr.Bytes()
150+
return eErr
151+
}
152+
153+
func (s *commandStage) Wait() error {
154+
defer close(s.done)
155+
156+
// Make sure that any stderr is copied before `s.cmd.Wait()`
157+
// closes the read end of the pipe:
158+
wErr := s.wg.Wait()
159+
160+
err := s.cmd.Wait()
161+
err = s.filterCmdError(err)
162+
163+
if err == nil && wErr != nil {
164+
err = wErr
165+
}
166+
167+
if s.stdin != nil {
168+
cErr := s.stdin.Close()
169+
if cErr != nil && err == nil {
170+
return cErr
171+
}
172+
}
173+
174+
return err
175+
}

internal/pipe/command_unix.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
//go:build !windows
2+
// +build !windows
3+
4+
package pipe
5+
6+
import (
7+
"syscall"
8+
"time"
9+
)
10+
11+
// runInOwnProcessGroup arranges for `cmd` to be run in its own
12+
// process group.
13+
func (s *commandStage) runInOwnProcessGroup() {
14+
// Put the command in its own process group:
15+
if s.cmd.SysProcAttr == nil {
16+
s.cmd.SysProcAttr = &syscall.SysProcAttr{}
17+
}
18+
s.cmd.SysProcAttr.Setpgid = true
19+
}
20+
21+
// kill is called to kill the process if the context expires. `err` is
22+
// the corresponding value of `Context.Err()`.
23+
func (s *commandStage) kill(err error) {
24+
// I believe that the calls to `syscall.Kill()` in this method are
25+
// racy. It could be that s.cmd.Wait() succeeds immediately before
26+
// this call, in which case the process group wouldn't exist
27+
// anymore. But I don't see any way to avoid this without
28+
// duplicating a lot of code from `exec.Cmd`. (`os.Cmd.Kill()` and
29+
// `os.Cmd.Signal()` appear to be race-free, but only because they
30+
// use internal synchronization. But those methods only kill the
31+
// process, not the process group, so they are not suitable here.
32+
33+
// We started the process with PGID == PID:
34+
pid := s.cmd.Process.Pid
35+
select {
36+
case <-s.done:
37+
// Process has ended; no need to kill it again.
38+
return
39+
default:
40+
}
41+
42+
// Record the `ctx.Err()`, which will be used as the error result
43+
// for this stage.
44+
s.ctxErr.Store(err)
45+
46+
// First try to kill using a relatively gentle signal so that
47+
// the processes have a chance to clean up after themselves:
48+
_ = syscall.Kill(-pid, syscall.SIGTERM)
49+
50+
// Well-behaved processes should commit suicide after the above,
51+
// but if they don't exit within 2s, murder the whole lot of them:
52+
go func() {
53+
// Use an explicit `time.Timer` rather than `time.After()` so
54+
// that we can stop it (freeing resources) promptly if the
55+
// command exits before the timer triggers.
56+
timer := time.NewTimer(2 * time.Second)
57+
defer timer.Stop()
58+
59+
select {
60+
case <-s.done:
61+
// Process has ended; no need to kill it again.
62+
case <-timer.C:
63+
_ = syscall.Kill(-pid, syscall.SIGKILL)
64+
}
65+
}()
66+
}

internal/pipe/command_windows.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//go:build windows
2+
// +build windows
3+
4+
package pipe
5+
6+
// runInOwnProcessGroup is not supported on Windows.
7+
func (s *commandStage) runInOwnProcessGroup() {}
8+
9+
// kill is called to kill the process if the context expires. `err` is
10+
// the corresponding value of `Context.Err()`.
11+
func (s *commandStage) kill(err error) {
12+
select {
13+
case <-s.done:
14+
// Process has ended; no need to kill it again.
15+
return
16+
default:
17+
}
18+
19+
// Record the `ctx.Err()`, which will be used as the error result
20+
// for this stage.
21+
s.ctxErr.Store(err)
22+
23+
s.cmd.Process.Kill()
24+
}

0 commit comments

Comments
 (0)