Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,4 @@ func (c *Client) Get() interface{} {
}

// Scrub is an Anchor interface method, not applicable to the root-level anchor.
func (c *Client) Scrub() {}
func (c *Client) Scrub() {}
3 changes: 3 additions & 0 deletions client/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Cmd struct {
// If Scrub is set, the process element will automatically be removed from its anchor
// when the process exits.
Scrub bool
Name string
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not be including a Name field in the command config. The names of commands are their anchor paths. (Configs and names are thus decoupled.)

If you simply want to run one-off commands and don't care what they are named, place all one off commands in
anchors like /X.../runproc/RANDOM_ID

If you want the user to specify the anchor of a one-off command that is executed, say, on all machines*, then you can take a name argument somewhere (command-line or otherwise), but you cannot add that name to the Cmd struct.
By the way, the name is really an anchor path without the first part.

I am not really sure why ever the user would want to name a one-off job that gets executed everywhere.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of this for showing the the name in the tag, but if the command is a one-off and will be auto-scrubbed, then there's no point in doing a name. I agree, there's no need for this.

}

func retypeProcStat(c proc.Cmd) Cmd {
Expand All @@ -41,6 +42,7 @@ func retypeProcStat(c proc.Cmd) Cmd {
Path: c.Path,
Args: c.Args,
Scrub: c.Scrub,
Name: c.Name,
}
}

Expand All @@ -51,6 +53,7 @@ func (cmd Cmd) retype() proc.Cmd {
Path: cmd.Path,
Args: cmd.Args,
Scrub: cmd.Scrub,
Name: cmd.Name,
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/circuit/glob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ import (
func TestGlob(t *testing.T) {
walk, ellipses := parseGlob("/X/hola/petar/...")
log.Printf("w=%v ell=%v", walk, ellipses)
}
}
12 changes: 12 additions & 0 deletions cmd/circuit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ func main() {
cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."},
},
},
{
Name: "runproc",
Usage: "Run a process element and return output on stdout",
Action: runproc,
Flags: []cli.Flag{
cli.StringFlag{"dial, d", "", "circuit member to dial into"},
cli.StringFlag{"discover", "228.8.8.8:8822", "Multicast address for peer server discovery"},
cli.BoolFlag{"tag", "tag each output line with the anchor names (hostnames)"},
cli.BoolFlag{"all", "run the command across all hosts"},
cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."},
},
},
{
Name: "signal",
Usage: "Send a signal to a running process",
Expand Down
135 changes: 134 additions & 1 deletion cmd/circuit/procdkr.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
package main

import (
"bufio"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"time"

"github.com/gocircuit/circuit/client"
"github.com/gocircuit/circuit/client/docker"

"github.com/gocircuit/circuit/github.com/codegangsta/cli"
)

//make this timeout come from the json input payload
var timeout = time.Duration(33)

// circuit mkproc /X1234/hola/charlie << EOF
// { … }
// EOF
Expand Down Expand Up @@ -53,6 +59,131 @@ func mkproc(x *cli.Context) {
}
}

func doRun(x *cli.Context, c *client.Client, cmd client.Cmd, path string) {

w2, _ := parseGlob(path)
a2 := c.Walk(w2)
_runproc(a2, cmd, x.Bool("tag"))

}

func runproc(x *cli.Context) {
defer func() {
if r := recover(); r != nil {
fatalf("error, likely due to missing server or misspelled anchor: %v", r)
}
}()
c := dial(x)
args := x.Args()

if len(args) != 1 && !x.Bool("all") {
fatalf("runproc needs an anchor argument or use the --all flag to do the entire circuit")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"to do the entire circuit" should be "to execute on every host in the circuit"

}
buf, _ := ioutil.ReadAll(os.Stdin)
var cmd client.Cmd
if err := json.Unmarshal(buf, &cmd); err != nil {
fatalf("command json not parsing: %v", err)
}
if x.Bool("scrub") {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cmd.Scrub should always be true, regardless of command-line arguments, since you are executing one-off jobs so you always want the system to remove the process element automatically when the process exits.

cmd.Scrub = true
}

el := string("")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

el := ""

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha, yea.

if len(cmd.Name) > 0 {
el = cmd.Name
} else {
el = cmd.Path
}

if x.Bool("all") {

w, _ := parseGlob("/")

anchor := c.Walk(w)

done := make(chan bool, 10)
procs := 0

for _, a := range anchor.View() {

procs++

go func(x *cli.Context, cmd client.Cmd, a string, done chan bool) {

doRun(x, c, cmd, a)
done <- true

}(x, cmd, a.Path()+"/"+el, done)

}

for ; procs > 0 ; procs-- {

select {
case <-done:
continue
case <-time.After(time.Second * timeout):
fmt.Fprintln(os.Stderr, "timeout waiting for the command")
}

}

} else {

doRun(x, c, cmd, args[0]+"/"+el)

}

}

func _runproc(a client.Anchor, cmd client.Cmd, tags bool) {

p, err := a.MakeProc(cmd)
if err != nil {
fatalf("mkproc error: %s", err)
}

q := p.Stdin()
if err := q.Close(); err != nil {
fatalf("error closing stdin: %v", err)
}

if tags {

done := make(chan bool)

go func(a string, r io.Reader, w io.Writer, d chan bool) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use PrefixReader or PrefixWriter from package kit/iomisc instead.

scanner := bufio.NewScanner(r)
for scanner.Scan() {
fmt.Fprintf(w, "%s %s\n", a, scanner.Text())
}
if err := scanner.Err(); err != nil {
d <- false
}

d <- true

}(a.Path(), p.Stdout(), os.Stdout, done)

select {
case rv := <-done:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are using the EOF from the process' stdout as an indicator of process exit. That's a bug. (What about stderr?)
All you need to do is call p.Wait(). That's the only correct thing to do in fact (in POSIX, not just the circuit).

if !rv {
fmt.Fprintln(os.Stderr, "error prefixing the data")
}
case <-time.After(time.Second * timeout):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's bad design. If the user doesn't want to wait, they can CTRL-C or wrap the whole invocation to the circuit tool with a another tool that kills it on time out, or at the bare minimum the timeout should be a command-line flag with a default value. Picking arbitrary timeouts is not good.

fmt.Fprintln(os.Stderr, "timeout waiting for data")
}

} else {

io.Copy(os.Stdout, p.Stdout())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about stderr?


}

a.Scrub()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a.Scrub() will kill the process immediately. That's a bug. All you need is to set the Scrub parameter in Cmd to true, this will take care of scrubbing the anchor automatically when the process ends. I pointed this out in a comment above.

}

func mkdkr(x *cli.Context) {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -92,7 +223,9 @@ func sgnl(x *cli.Context) {
fatalf("signal needs an anchor and a signal name arguments")
}
w, _ := parseGlob(args[1])
u, ok := c.Walk(w).Get().(interface{Signal(string) error})
u, ok := c.Walk(w).Get().(interface {
Signal(string) error
})
if !ok {
fatalf("anchor is not a process or a docker container")
}
Expand Down
1 change: 1 addition & 0 deletions element/proc/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Cmd struct {
Path string `json:"path"`
Args []string `json:"args"`
Scrub bool `json:"scrub"`
Name string `json:"name"`
}

func ParseCmd(src string) (*Cmd, error) {
Expand Down
4 changes: 4 additions & 0 deletions element/proc/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type proc struct {
sync.Mutex
cmd exec.Cmd
scrb bool
name string
abr chan<- struct{}
wait chan<- error
exit error // exit set by waiter
Expand All @@ -66,6 +67,7 @@ func MakeProc(cmd Cmd) Proc {
p.cmd.cmd.Dir = cmd.Dir
bin := strings.TrimSpace(cmd.Path)
p.cmd.cmd.Path = bin
p.cmd.name = cmd.Name
p.cmd.cmd.Args = append([]string{bin}, cmd.Args...)
p.cmd.scrb = cmd.Scrub
// exec
Expand Down Expand Up @@ -148,6 +150,7 @@ func (p *proc) GetCmd() Cmd {
Path: p.cmd.cmd.Path,
Args: p.cmd.cmd.Args[1:],
Scrub: p.cmd.scrb,
Name: p.cmd.name,
}
}

Expand Down Expand Up @@ -177,6 +180,7 @@ func (p *proc) peek() Stat {
Path: p.cmd.cmd.Path,
Args: p.cmd.cmd.Args[1:],
Scrub: p.cmd.scrb,
Name: p.cmd.name,
},
Exit: p.cmd.exit,
Phase: p.phase().String(),
Expand Down