Skip to content

Commit e822b6a

Browse files
authored
Add asyncexec package to simplify os/exec output streaming (#15)
Issue N/A Description of changes: - Add `asyncexec` package to simplify `os/exec.Command` output streaming By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
1 parent db0c754 commit e822b6a

File tree

4 files changed

+210
-1
lines changed

4 files changed

+210
-1
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ install: build
1717
cp ./bin/ackdev $(shell go env GOPATH)/bin/ackdev
1818

1919
test:
20-
go test -v ./...
20+
go test -tags $(shell go env GOOS) -v ./...
2121

2222
.PHONY: test install mocks
2323

pkg/asyncexec/cmd.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package asyncexec
2+
3+
import (
4+
"bufio"
5+
"os/exec"
6+
)
7+
8+
// New instantiate a new Cmd object.
9+
func New(cmd *exec.Cmd, buff int) *Cmd {
10+
return &Cmd{
11+
cmd: cmd,
12+
stopCh: make(chan struct{}),
13+
stdoutCh: make(chan []byte, buff),
14+
stderrCh: make(chan []byte, buff),
15+
}
16+
}
17+
18+
// Cmd is a wrapper arround exec.Cmd. Mainly used to execute
19+
// command asynchronously with/or without output stream.
20+
type Cmd struct {
21+
cmd *exec.Cmd
22+
23+
stopCh chan struct{}
24+
stdoutCh chan []byte
25+
stderrCh chan []byte
26+
}
27+
28+
// Run runs the command. if streamOutput is true, it will spin
29+
// two goroutine responsible of streaming the stdout and stderr
30+
func (c *Cmd) Run() error {
31+
cmdStdoutReader, err := c.cmd.StdoutPipe()
32+
if err != nil {
33+
return err
34+
}
35+
stdoutScanner := bufio.NewScanner(cmdStdoutReader)
36+
37+
cmdStderrReader, err := c.cmd.StderrPipe()
38+
if err != nil {
39+
return err
40+
}
41+
stderrScanner := bufio.NewScanner(cmdStderrReader)
42+
43+
// Goroutine for stdout
44+
go func() {
45+
defer close(c.stdoutCh)
46+
for stdoutScanner.Scan() {
47+
bytes := stdoutScanner.Bytes()
48+
c.stdoutCh <- bytes
49+
}
50+
}()
51+
52+
// Goroutine for stderr
53+
go func() {
54+
defer close(c.stderrCh)
55+
for stderrScanner.Scan() {
56+
bytes := stderrScanner.Bytes()
57+
c.stderrCh <- bytes
58+
}
59+
}()
60+
61+
err = c.cmd.Start()
62+
if err != nil {
63+
return err
64+
}
65+
66+
// listening for stop signal
67+
go func() {
68+
<-c.stopCh
69+
c.cmd.Process.Kill()
70+
}()
71+
72+
return nil
73+
}
74+
75+
// Exited returns true if the command exited, false otherwise.
76+
func (c *Cmd) Exited() bool {
77+
return c.cmd.ProcessState.Exited()
78+
}
79+
80+
// ExitCode returns the command process exit code.
81+
func (c *Cmd) ExitCode() int {
82+
return c.cmd.ProcessState.ExitCode()
83+
}
84+
85+
// StdoutStream returns a channel streaming the command Stdout.
86+
func (c *Cmd) StdoutStream() <-chan []byte {
87+
return c.stdoutCh
88+
}
89+
90+
// StderrStream returns a channel streaming the command Stderr.
91+
func (c *Cmd) StderrStream() <-chan []byte {
92+
return c.stderrCh
93+
}
94+
95+
// Wait blocks until the command exits
96+
func (c *Cmd) Wait() error {
97+
return c.cmd.Wait()
98+
}
99+
100+
// Stop signals the Wrapper to kill the process running the command.
101+
func (c *Cmd) Stop() {
102+
c.stopCh <- struct{}{}
103+
}

pkg/asyncexec/cmd_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// +build !windows
2+
3+
package asyncexec_test
4+
5+
import (
6+
"fmt"
7+
"os/exec"
8+
9+
"github.com/aws-controllers-k8s/dev-tools/pkg/asyncexec"
10+
)
11+
12+
func ExampleCmd_Run_withNoStream() {
13+
cmd := asyncexec.New(exec.Command("echo", "Hello ACK"), 16)
14+
cmd.Run()
15+
cmd.Wait()
16+
fmt.Println(cmd.ExitCode())
17+
// Output: 0
18+
}
19+
20+
func ExampleCmd_Run_withStream() {
21+
cmd := asyncexec.New(exec.Command("echo", "Hello ACK"), 16)
22+
cmd.Run()
23+
24+
done := make(chan struct{})
25+
go func() {
26+
for b := range cmd.StdoutStream() {
27+
fmt.Println(string(b))
28+
29+
}
30+
done <- struct{}{}
31+
}()
32+
go func() {
33+
for b := range cmd.StderrStream() {
34+
fmt.Println(string(b))
35+
36+
}
37+
done <- struct{}{}
38+
}()
39+
40+
defer func() { _, _ = <-done, <-done }()
41+
42+
cmd.Wait()
43+
// Output: Hello ACK
44+
}

pkg/asyncexec/stream.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package asyncexec
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"os/exec"
7+
)
8+
9+
// StreamCommand executes a given command in a context directory and streams
10+
// the outputs to their according stdeout/stderr.
11+
func StreamCommand(workDir string, command string, args []string) error {
12+
cmd := exec.Command(command, args...)
13+
if workDir != "" {
14+
cmd.Dir = workDir
15+
}
16+
17+
acmd := New(cmd, 8)
18+
err := acmd.Run()
19+
if err != nil {
20+
return err
21+
}
22+
23+
// done is used to wait for stream readers to finish before exiting the function.
24+
done := make(chan struct{})
25+
26+
go func() {
27+
for b := range acmd.StdoutStream() {
28+
_, err := os.Stdout.Write(b)
29+
if err != nil {
30+
msg := fmt.Sprintf("failed to write to Stdout: %v", err)
31+
// should never happen, just panic.
32+
panic(msg)
33+
}
34+
}
35+
done <- struct{}{}
36+
}()
37+
go func() {
38+
for b := range acmd.StderrStream() {
39+
_, err := os.Stderr.Write(b)
40+
if err != nil {
41+
msg := fmt.Sprintf("failed to write to Stderr: %v", err)
42+
// should never happen, just panic.
43+
panic(msg)
44+
}
45+
}
46+
done <- struct{}{}
47+
}()
48+
49+
// wait for printers to finish
50+
defer func() { _, _ = <-done, <-done }()
51+
52+
// wait for command to finish
53+
err = acmd.Wait()
54+
if err != nil {
55+
return err
56+
}
57+
58+
if acmd.ExitCode() != 0 {
59+
return fmt.Errorf("exited with code %d", acmd.ExitCode())
60+
}
61+
return nil
62+
}

0 commit comments

Comments
 (0)