Skip to content

Commit 7171874

Browse files
authored
Added process.Background() and process.Forwarded() (#804)
## Changes This PR adds higher-level wrappers for calling subprocesses. One of the steps to get #637 in, as previously discussed. The reason to add `process.Forwarded()` is to proxy Python's `input()` calls from a child process seamlessly. Another use-case is plugging in `less` as a pager for the list results. ## Tests `make test`
1 parent 3ee89c4 commit 7171874

File tree

13 files changed

+390
-24
lines changed

13 files changed

+390
-24
lines changed

bundle/config/artifact.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"os/exec"
87
"path"
98
"strings"
109

1110
"github.com/databricks/cli/bundle/config/paths"
11+
"github.com/databricks/cli/libs/process"
1212
"github.com/databricks/databricks-sdk-go/service/compute"
1313
)
1414

@@ -56,13 +56,14 @@ func (a *Artifact) Build(ctx context.Context) ([]byte, error) {
5656
commands := strings.Split(a.BuildCommand, " && ")
5757
for _, command := range commands {
5858
buildParts := strings.Split(command, " ")
59-
cmd := exec.CommandContext(ctx, buildParts[0], buildParts[1:]...)
60-
cmd.Dir = a.Path
61-
res, err := cmd.CombinedOutput()
59+
var buf bytes.Buffer
60+
_, err := process.Background(ctx, buildParts,
61+
process.WithCombinedOutput(&buf),
62+
process.WithDir(a.Path))
6263
if err != nil {
63-
return res, err
64+
return buf.Bytes(), err
6465
}
65-
out = append(out, res)
66+
out = append(out, buf.Bytes())
6667
}
6768
return bytes.Join(out, []byte{}), nil
6869
}

bundle/scripts/scripts.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func executeHook(ctx context.Context, b *bundle.Bundle, hook config.ScriptHook)
6161
return nil, nil, err
6262
}
6363

64+
// TODO: switch to process.Background(...)
6465
cmd := exec.CommandContext(ctx, interpreter, "-c", string(command))
6566
cmd.Dir = b.Config.Path
6667

libs/env/context.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package env
33
import (
44
"context"
55
"os"
6+
"strings"
67
)
78

89
var envContextKey int
@@ -61,3 +62,21 @@ func Set(ctx context.Context, key, value string) context.Context {
6162
m[key] = value
6263
return setMap(ctx, m)
6364
}
65+
66+
// All returns environment variables that are defined in both os.Environ
67+
// and this package. `env.Set(ctx, x, y)` will override x from os.Environ.
68+
func All(ctx context.Context) map[string]string {
69+
m := map[string]string{}
70+
for _, line := range os.Environ() {
71+
split := strings.SplitN(line, "=", 2)
72+
if len(split) != 2 {
73+
continue
74+
}
75+
m[split[0]] = split[1]
76+
}
77+
// override existing environment variables with the ones we set
78+
for k, v := range getMap(ctx) {
79+
m[k] = v
80+
}
81+
return m
82+
}

libs/env/context_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,12 @@ func TestContext(t *testing.T) {
3838
assert.Equal(t, "qux", Get(ctx2, "FOO"))
3939
assert.Equal(t, "baz", Get(ctx1, "FOO"))
4040
assert.Equal(t, "bar", Get(ctx0, "FOO"))
41+
42+
ctx3 := Set(ctx2, "BAR", "x=y")
43+
44+
all := All(ctx3)
45+
assert.NotNil(t, all)
46+
assert.Equal(t, "qux", all["FOO"])
47+
assert.Equal(t, "x=y", all["BAR"])
48+
assert.NotEmpty(t, all["PATH"])
4149
}

libs/git/clone.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package git
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
87
"os/exec"
98
"regexp"
109
"strings"
10+
11+
"github.com/databricks/cli/libs/process"
1112
)
1213

1314
// source: https://stackoverflow.com/questions/59081778/rules-for-special-characters-in-github-repository-name
@@ -42,23 +43,17 @@ func (opts cloneOptions) args() []string {
4243
}
4344

4445
func (opts cloneOptions) clone(ctx context.Context) error {
45-
cmd := exec.CommandContext(ctx, "git", opts.args()...)
46-
var cmdErr bytes.Buffer
47-
cmd.Stderr = &cmdErr
48-
49-
// start git clone
50-
err := cmd.Start()
46+
// start and wait for git clone to complete
47+
_, err := process.Background(ctx, append([]string{"git"}, opts.args()...))
5148
if errors.Is(err, exec.ErrNotFound) {
5249
return fmt.Errorf("please install git CLI to clone a repository: %w", err)
5350
}
54-
if err != nil {
55-
return fmt.Errorf("git clone failed: %w", err)
51+
var processErr *process.ProcessError
52+
if errors.As(err, &processErr) {
53+
return fmt.Errorf("git clone failed: %w. %s", err, processErr.Stderr)
5654
}
57-
58-
// wait for git clone to complete
59-
err = cmd.Wait()
6055
if err != nil {
61-
return fmt.Errorf("git clone failed: %w. %s", err, cmdErr.String())
56+
return fmt.Errorf("git clone failed: %w", err)
6257
}
6358
return nil
6459
}

libs/process/background.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package process
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"os/exec"
8+
"strings"
9+
10+
"github.com/databricks/cli/libs/env"
11+
"github.com/databricks/cli/libs/log"
12+
)
13+
14+
type ProcessError struct {
15+
Command string
16+
Err error
17+
Stdout string
18+
Stderr string
19+
}
20+
21+
func (perr *ProcessError) Unwrap() error {
22+
return perr.Err
23+
}
24+
25+
func (perr *ProcessError) Error() string {
26+
return fmt.Sprintf("%s: %s", perr.Command, perr.Err)
27+
}
28+
29+
func Background(ctx context.Context, args []string, opts ...execOption) (string, error) {
30+
commandStr := strings.Join(args, " ")
31+
log.Debugf(ctx, "running: %s", commandStr)
32+
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
33+
stdout := bytes.Buffer{}
34+
stderr := bytes.Buffer{}
35+
// For background processes, there's no standard input
36+
cmd.Stdin = nil
37+
cmd.Stdout = &stdout
38+
cmd.Stderr = &stderr
39+
// we pull the env through lib/env such that we can run
40+
// parallel tests with anything using libs/process.
41+
for k, v := range env.All(ctx) {
42+
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
43+
}
44+
for _, o := range opts {
45+
err := o(ctx, cmd)
46+
if err != nil {
47+
return "", err
48+
}
49+
}
50+
if err := cmd.Run(); err != nil {
51+
return stdout.String(), &ProcessError{
52+
Err: err,
53+
Command: commandStr,
54+
Stdout: stdout.String(),
55+
Stderr: stderr.String(),
56+
}
57+
}
58+
return stdout.String(), nil
59+
}

libs/process/background_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package process
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"os"
8+
"os/exec"
9+
"strings"
10+
"testing"
11+
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestBackgroundUnwrapsNotFound(t *testing.T) {
16+
ctx := context.Background()
17+
_, err := Background(ctx, []string{"/bin/meeecho", "1"})
18+
assert.ErrorIs(t, err, os.ErrNotExist)
19+
}
20+
21+
func TestBackground(t *testing.T) {
22+
ctx := context.Background()
23+
res, err := Background(ctx, []string{"echo", "1"}, WithDir("/"))
24+
assert.NoError(t, err)
25+
assert.Equal(t, "1", strings.TrimSpace(res))
26+
}
27+
28+
func TestBackgroundOnlyStdoutGetsoutOnSuccess(t *testing.T) {
29+
ctx := context.Background()
30+
res, err := Background(ctx, []string{
31+
"python3", "-c", "import sys; sys.stderr.write('1'); sys.stdout.write('2')",
32+
})
33+
assert.NoError(t, err)
34+
assert.Equal(t, "2", res)
35+
}
36+
37+
func TestBackgroundCombinedOutput(t *testing.T) {
38+
ctx := context.Background()
39+
buf := bytes.Buffer{}
40+
res, err := Background(ctx, []string{
41+
"python3", "-c", "import sys, time; " +
42+
`sys.stderr.write("1\n"); sys.stderr.flush(); ` +
43+
"time.sleep(0.001); " +
44+
"print('2', flush=True); sys.stdout.flush(); " +
45+
"time.sleep(0.001)",
46+
}, WithCombinedOutput(&buf))
47+
assert.NoError(t, err)
48+
assert.Equal(t, "2", strings.TrimSpace(res))
49+
assert.Equal(t, "1\n2\n", strings.ReplaceAll(buf.String(), "\r", ""))
50+
}
51+
52+
func TestBackgroundCombinedOutputFailure(t *testing.T) {
53+
ctx := context.Background()
54+
buf := bytes.Buffer{}
55+
res, err := Background(ctx, []string{
56+
"python3", "-c", "import sys, time; " +
57+
`sys.stderr.write("1\n"); sys.stderr.flush(); ` +
58+
"time.sleep(0.001); " +
59+
"print('2', flush=True); sys.stdout.flush(); " +
60+
"time.sleep(0.001); " +
61+
"sys.exit(42)",
62+
}, WithCombinedOutput(&buf))
63+
var processErr *ProcessError
64+
if assert.ErrorAs(t, err, &processErr) {
65+
assert.Equal(t, "1", strings.TrimSpace(processErr.Stderr))
66+
assert.Equal(t, "2", strings.TrimSpace(processErr.Stdout))
67+
}
68+
assert.Equal(t, "2", strings.TrimSpace(res))
69+
assert.Equal(t, "1\n2\n", strings.ReplaceAll(buf.String(), "\r", ""))
70+
}
71+
72+
func TestBackgroundNoStdin(t *testing.T) {
73+
ctx := context.Background()
74+
res, err := Background(ctx, []string{"cat"})
75+
assert.NoError(t, err)
76+
assert.Equal(t, "", res)
77+
}
78+
79+
func TestBackgroundFails(t *testing.T) {
80+
ctx := context.Background()
81+
_, err := Background(ctx, []string{"ls", "/dev/null/x"})
82+
assert.NotNil(t, err)
83+
}
84+
85+
func TestBackgroundFailsOnOption(t *testing.T) {
86+
ctx := context.Background()
87+
_, err := Background(ctx, []string{"ls", "/dev/null/x"}, func(_ context.Context, c *exec.Cmd) error {
88+
return fmt.Errorf("nope")
89+
})
90+
assert.EqualError(t, err, "nope")
91+
}

libs/process/forwarded.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package process
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"os/exec"
8+
"strings"
9+
10+
"github.com/databricks/cli/libs/env"
11+
"github.com/databricks/cli/libs/log"
12+
)
13+
14+
func Forwarded(ctx context.Context, args []string, src io.Reader, outWriter, errWriter io.Writer, opts ...execOption) error {
15+
commandStr := strings.Join(args, " ")
16+
log.Debugf(ctx, "starting: %s", commandStr)
17+
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
18+
19+
// empirical tests showed buffered copies being more responsive
20+
cmd.Stdout = outWriter
21+
cmd.Stderr = errWriter
22+
cmd.Stdin = src
23+
// we pull the env through lib/env such that we can run
24+
// parallel tests with anything using libs/process.
25+
for k, v := range env.All(ctx) {
26+
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
27+
}
28+
29+
// apply common options
30+
for _, o := range opts {
31+
err := o(ctx, cmd)
32+
if err != nil {
33+
return err
34+
}
35+
}
36+
37+
err := cmd.Start()
38+
if err != nil {
39+
return err
40+
}
41+
42+
return cmd.Wait()
43+
}

libs/process/forwarded_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package process
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"os/exec"
7+
"strings"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func TestForwarded(t *testing.T) {
14+
ctx := context.Background()
15+
var buf bytes.Buffer
16+
err := Forwarded(ctx, []string{
17+
"python3", "-c", "print(input('input: '))",
18+
}, strings.NewReader("abc\n"), &buf, &buf)
19+
assert.NoError(t, err)
20+
21+
assert.Equal(t, "input: abc", strings.TrimSpace(buf.String()))
22+
}
23+
24+
func TestForwardedFails(t *testing.T) {
25+
ctx := context.Background()
26+
var buf bytes.Buffer
27+
err := Forwarded(ctx, []string{
28+
"_non_existent_",
29+
}, strings.NewReader("abc\n"), &buf, &buf)
30+
assert.NotNil(t, err)
31+
}
32+
33+
func TestForwardedFailsOnStdinPipe(t *testing.T) {
34+
ctx := context.Background()
35+
var buf bytes.Buffer
36+
err := Forwarded(ctx, []string{
37+
"_non_existent_",
38+
}, strings.NewReader("abc\n"), &buf, &buf, func(_ context.Context, c *exec.Cmd) error {
39+
c.Stdin = strings.NewReader("x")
40+
return nil
41+
})
42+
assert.NotNil(t, err)
43+
}

0 commit comments

Comments
 (0)