Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit c465c19

Browse files
author
David Chung
authored
exec pipeline (#462)
Signed-off-by: David Chung <[email protected]>
1 parent 02d6425 commit c465c19

File tree

6 files changed

+195
-137
lines changed

6 files changed

+195
-137
lines changed

pkg/leader/etcd/v3/etcd_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestWithRealEtcd(t *testing.T) {
3333
}
3434
}
3535

36-
defer etcd.StopContainer.Run(containerName)
36+
defer etcd.StopContainer.Start(containerName)
3737

3838
t.Run("AmILeader", testAmILeader)
3939
}

pkg/store/etcd/v3/etcd_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestWithRealEtcd(t *testing.T) {
3434
}
3535
}
3636

37-
defer etcd.StopContainer.Run(containerName)
37+
defer etcd.StopContainer.Start(containerName)
3838

3939
t.Run("SaveLoad", testSaveLoad)
4040
}

pkg/util/etcd/v3/etcd.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func LocalIP() string {
6262
var (
6363

6464
// RunContainer is a command that shells out to Docker to run the etcd server in a container
65-
RunContainer exec.Command = `
65+
RunContainer = exec.Command(`
6666
docker run --rm -d \
6767
-v /usr/share/ca-certificates/:/etc/ssl/certs \
6868
-p 4001:4001 \
@@ -78,18 +78,21 @@ docker run --rm -d \
7878
-listen-peer-urls http://0.0.0.0:2380 \
7979
-initial-cluster-token etcd-cluster-1 \
8080
-initial-cluster etcd0=http://{{ arg 1 }}:2380 \
81-
-initial-cluster-state new`
81+
-initial-cluster-state new
82+
`)
8283

8384
// StopContainer stops the etcd container
84-
StopContainer exec.Command = `docker stop {{ arg 1 }}`
85+
StopContainer = exec.Command(`docker stop {{ arg 1 }}`)
8586

8687
// LsMembers lists the members in the cluster
87-
LsMembers exec.Command = `
88+
LsMembers = exec.Command(`
8889
docker run --rm -e ETCDCTL_API=3 \
89-
quay.io/coreos/etcd etcdctl --endpoints={{ arg 1 }}:2379 member list`
90+
quay.io/coreos/etcd etcdctl --endpoints={{ arg 1 }}:2379 member list
91+
`)
9092

9193
// Get fetches a value via etcdctl
92-
Get exec.Command = `
94+
Get = exec.Command(`
9395
docker run --rm -e ETCDCTL_API=3 \
94-
quay.io/coreos/etcd etcdctl --endpoints={{ arg 1 }}:2379 get --print-value-only {{ arg 2 }}`
96+
quay.io/coreos/etcd etcdctl --endpoints={{ arg 1 }}:2379 get --print-value-only {{ arg 2 }}
97+
`)
9598
)

pkg/util/etcd/v3/etcd_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,6 @@ func TestRunInContainer(t *testing.T) {
9090
}
9191

9292
log.Infoln("Stopping etcd")
93-
err = StopContainer.Run(containerName)
93+
err = StopContainer.Start(containerName)
9494
require.NoError(t, err)
9595
}

pkg/util/exec/exec.go

Lines changed: 108 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package exec
22

33
import (
4+
"fmt"
45
"io"
56
"os"
67
"os/exec"
@@ -12,75 +13,41 @@ import (
1213

1314
var log = logutil.New("module", "util/exec")
1415

15-
// Command is the template which is rendered before it's executed
16-
type Command string
17-
18-
// Output runs the command until completion and returns the results
19-
func (c Command) Output(args ...string) ([]byte, error) {
20-
return c.builder().Output(args...)
21-
}
22-
23-
// Start runs the command without blocking
24-
func (c Command) Start(args ...string) error {
25-
return c.builder().Start(args...)
26-
}
27-
28-
// Run does a Cmd.Run on the command
29-
func (c Command) Run(args ...string) error {
30-
return c.builder().Run(args...)
31-
}
32-
33-
// String returns the interpolated version of the command
34-
func (c Command) String(args ...string) (string, error) {
35-
p, err := c.builder().generate(args...)
36-
if err == nil {
37-
return strings.Join(p, " "), nil
16+
// Command returns a fluent builder for running a command where the command string
17+
// can have template functions and arguments
18+
func Command(s string) *Builder {
19+
return &Builder{
20+
command: s,
21+
funcs: map[string]interface{}{},
22+
args: map[interface{}]interface{}{},
3823
}
39-
return string(c), err
40-
}
41-
42-
// WithOptions adds the template options
43-
func (c Command) WithOptions(options template.Options) *Builder {
44-
return c.builder().WithOptions(options)
45-
}
46-
47-
// WithFunc adds a function that can be used in the template
48-
func (c Command) WithFunc(name string, function interface{}) *Builder {
49-
return c.builder().WithFunc(name, function)
50-
}
51-
52-
// WithContext sets the context for the template
53-
func (c Command) WithContext(context interface{}) *Builder {
54-
return c.builder().WithContext(context)
55-
}
56-
57-
// InheritEnvs determines whether the process should inherit the envs of the parent
58-
func (c Command) InheritEnvs(v bool) *Builder {
59-
return c.builder().InheritEnvs(v)
60-
}
61-
62-
// NewCommand creates an instance of the command builder to allow detailed configuration
63-
func NewCommand(s string) *Builder {
64-
return Command(s).builder()
6524
}
6625

6726
// Builder collects options until it's run
6827
type Builder struct {
69-
command Command
28+
command string
7029
options template.Options
7130
inheritEnvs bool
7231
envs []string
7332
funcs map[string]interface{}
33+
args map[interface{}]interface{}
7434
context interface{}
7535
rendered string // rendered command string
7636
cmd *exec.Cmd
7737
}
7838

79-
func (c Command) builder() *Builder {
80-
return &Builder{
81-
command: c,
82-
funcs: map[string]interface{}{},
39+
// WithArg sets the arg key, value pair that can be accessed via the 'arg' function
40+
func (b *Builder) WithArg(key string, value interface{}) *Builder {
41+
b.args[key] = value
42+
return b
43+
}
44+
45+
// WithArgs adds the command line args array
46+
func (b *Builder) WithArgs(args ...interface{}) *Builder {
47+
for i, arg := range args {
48+
b.args[i+1] = arg
8349
}
50+
return b
8451
}
8552

8653
// InheritEnvs determines whether the process should inherit the envs of the parent
@@ -113,24 +80,6 @@ func (b *Builder) WithContext(context interface{}) *Builder {
11380
return b
11481
}
11582

116-
// Output runs the command until completion and returns the results
117-
func (b *Builder) Output(args ...string) ([]byte, error) {
118-
run, err := b.exec(args...)
119-
if err != nil {
120-
return nil, err
121-
}
122-
return run.Output()
123-
}
124-
125-
// Start runs the command without blocking
126-
func (b *Builder) Start(args ...string) error {
127-
run, err := b.exec(args...)
128-
if err != nil {
129-
return err
130-
}
131-
return run.Start()
132-
}
133-
13483
// Step is something you do with the processes streams
13584
type Step func(stdin io.WriteCloser, stdout io.ReadCloser, stderr io.ReadCloser) error
13685

@@ -154,10 +103,10 @@ func (t *Thenable) Then(then Step) *Thenable {
154103

155104
// Done returns the final function
156105
func (t *Thenable) Done() Step {
157-
steps := t.steps
106+
all := t.steps
158107
return func(stdin io.WriteCloser, stdout, stderr io.ReadCloser) error {
159-
for _, step := range steps {
160-
if err := step(stdin, stdout, stderr); err != nil {
108+
for _, next := range all {
109+
if err := next(stdin, stdout, stderr); err != nil {
161110
return err
162111
}
163112
}
@@ -200,57 +149,89 @@ func MergeOutput(out io.Writer) Step {
200149

201150
// StartWithStreams starts the the process and then calls the function which allows
202151
// the streams to be wired. Calling the provided function blocks.
203-
func (b *Builder) StartWithStreams(f Step,
204-
args ...string) error {
152+
func (b *Builder) StartWithStreams(f Step, args ...interface{}) error {
205153

206-
_, err := b.exec(args...)
207-
if err != nil {
154+
if err := b.Prepare(args...); err != nil {
208155
return err
209156
}
210157

211-
pOut, err := b.cmd.StdoutPipe()
212-
if err != nil {
213-
return err
214-
}
215-
pErr, err := b.cmd.StderrPipe()
216-
if err != nil {
217-
return err
158+
run := func() error { return nil }
159+
if f != nil {
160+
pIn, err := b.cmd.StdinPipe()
161+
if err != nil {
162+
return err
163+
}
164+
165+
pOut, err := b.cmd.StdoutPipe()
166+
if err != nil {
167+
return err
168+
}
169+
170+
pErr, err := b.cmd.StderrPipe()
171+
if err != nil {
172+
return err
173+
}
174+
175+
run = func() error {
176+
return f(pIn, pOut, pErr)
177+
}
218178
}
219-
pIn, err := b.cmd.StdinPipe()
220-
if err != nil {
179+
180+
if err := b.cmd.Start(); err != nil {
221181
return err
222182
}
223183

224-
err = b.cmd.Start()
225-
if err != nil {
184+
return run()
185+
}
186+
187+
// Start does a Cmd.Start on the command
188+
func (b *Builder) Start(args ...interface{}) error {
189+
if err := b.Prepare(args...); err != nil {
226190
return err
227191
}
192+
return b.StartWithStreams(nil, args...)
193+
}
228194

229-
return f(pIn, pOut, pErr)
195+
// Wait waits for the command to complete
196+
func (b *Builder) Wait() error {
197+
return b.cmd.Wait()
230198
}
231199

232-
// Run does a Cmd.Run on the command
233-
func (b *Builder) Run(args ...string) error {
234-
run, err := b.exec(args...)
235-
if err != nil {
236-
return err
200+
// Output runs the command until completion and returns the results
201+
func (b *Builder) Output(args ...interface{}) ([]byte, error) {
202+
if err := b.Prepare(args...); err != nil {
203+
return nil, err
237204
}
238-
return run.Run()
205+
return b.cmd.Output()
239206
}
240207

241-
func (b *Builder) generate(args ...string) ([]string, error) {
208+
func (b *Builder) generate(args ...interface{}) ([]string, error) {
209+
// also index the args by index
210+
for i, v := range args {
211+
b.args[i+1] = v
212+
}
213+
242214
ct, err := template.NewTemplate("str://"+string(b.command), template.Options{})
243215
if err != nil {
244216
return nil, err
245217
}
246218
for k, v := range b.funcs {
247219
ct.AddFunc(k, v)
248220
}
249-
ct.AddFunc("arg", func(i int) interface{} {
250-
return args[i-1] // starts at 1
221+
ct.AddFunc("arg", func(i interface{}) interface{} {
222+
if i, is := i.(int); is {
223+
if len(args) > i {
224+
return args[i-1] // starts at 1
225+
}
226+
}
227+
return b.args[i]
251228
})
252229
ct.AddFunc("argv", func() interface{} {
253-
return args
230+
argv := []string{}
231+
for _, arg := range args {
232+
argv = append(argv, fmt.Sprintf("%v", arg))
233+
}
234+
return argv
254235
})
255236
cmd, err := ct.Render(b.context)
256237
if err != nil {
@@ -267,16 +248,39 @@ func (b *Builder) generate(args ...string) ([]string, error) {
267248
}
268249
return command, nil
269250
}
270-
func (b *Builder) exec(args ...string) (*exec.Cmd, error) {
251+
252+
// Prepare generates the command based on the input args. This is the step before actual Start or Run
253+
func (b *Builder) Prepare(args ...interface{}) error {
271254
command, err := b.generate(args...)
272255
if err != nil {
273-
return nil, err
256+
return err
274257
}
275258
log.Debug("exec", "command", command)
276259
b.cmd = exec.Command(command[0], command[1:]...)
277260
if b.inheritEnvs {
278261
b.cmd.Env = append(os.Environ(), b.envs...)
279262
}
263+
return nil
264+
}
265+
266+
// Stdin takes the input from the writer
267+
func (b *Builder) Stdin(f func(w io.Writer) error) error {
268+
input, err := b.cmd.StdinPipe()
269+
if err != nil {
270+
return err
271+
}
272+
defer input.Close()
273+
return f(input)
274+
}
275+
276+
// StdoutTo connects the stdout of this to the next stage
277+
func (b *Builder) StdoutTo(next *Builder) {
278+
r, w := io.Pipe()
279+
b.cmd.Stdout = w
280+
next.cmd.Stdin = r
281+
}
280282

281-
return b.cmd, nil
283+
// Stdout sets the stdout
284+
func (b *Builder) Stdout(w io.Writer) {
285+
b.cmd.Stdout = w
282286
}

0 commit comments

Comments
 (0)