diff --git a/client/client.go b/client/client.go index fe58cd1..059ae8e 100644 --- a/client/client.go +++ b/client/client.go @@ -163,4 +163,4 @@ func (c *Client) Get() interface{} { } // Scrub is an Anchor interface method, not applicable to the root-level anchor. -func (c *Client) Scrub() {} \ No newline at end of file +func (c *Client) Scrub() {} diff --git a/client/proc.go b/client/proc.go index 70e9262..81267a4 100644 --- a/client/proc.go +++ b/client/proc.go @@ -32,6 +32,7 @@ type Cmd struct { // If Scrub is set, the process element will automatically be removed from its anchor // when the process exits. Scrub bool + } func retypeProcStat(c proc.Cmd) Cmd { diff --git a/cmd/circuit/glob_test.go b/cmd/circuit/glob_test.go index a2bd8a8..8556751 100644 --- a/cmd/circuit/glob_test.go +++ b/cmd/circuit/glob_test.go @@ -15,4 +15,4 @@ import ( func TestGlob(t *testing.T) { walk, ellipses := parseGlob("/X/hola/petar/...") log.Printf("w=%v ell=%v", walk, ellipses) -} \ No newline at end of file +} diff --git a/cmd/circuit/hmac.go b/cmd/circuit/hmac.go index fbe881c..59e0f0a 100644 --- a/cmd/circuit/hmac.go +++ b/cmd/circuit/hmac.go @@ -17,7 +17,7 @@ import ( "github.com/gocircuit/circuit/github.com/codegangsta/cli" ) -func keygen(c *cli.Context) { +func keygen(c *cli.Context) string { rand.Seed(time.Now().UnixNano()) seed := make([]byte, 32) for i, _ := range seed { @@ -25,5 +25,12 @@ func keygen(c *cli.Context) { } key := sha512.Sum512(seed) text := base64.StdEncoding.EncodeToString(key[:]) - fmt.Println(text) + + return text +} + +func keygenPrint(c *cli.Context) { + + fmt.Println(keygen(c)) + } diff --git a/cmd/circuit/main.go b/cmd/circuit/main.go index 5e0d93f..87feada 100644 --- a/cmd/circuit/main.go +++ b/cmd/circuit/main.go @@ -37,7 +37,7 @@ func main() { { Name: "keygen", Usage: "Generate a new random HMAC key", - Action: keygen, + Action: keygenPrint, }, { Name: "ls", @@ -219,6 +219,18 @@ func main() { cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."}, }, }, + { + Name: "runproc", + Usage: "Run a process element and return output on stdout", + Action: runproc, + Flags: []cli.Flag{ + cli.StringFlag{"dial, d", "", "circuit member to dial into"}, + cli.StringFlag{"discover", "228.8.8.8:8822", "Multicast address for peer server discovery"}, + cli.BoolFlag{"tag", "tag each output line with the anchor names (hostnames)"}, + cli.BoolFlag{"all", "run the command across all hosts"}, + cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."}, + }, + }, { Name: "signal", Usage: "Send a signal to a running process", diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index c5d93e7..4e9350c 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -8,13 +8,16 @@ package main import ( + "bufio" "encoding/json" "fmt" + "io" "io/ioutil" "os" "github.com/gocircuit/circuit/client" "github.com/gocircuit/circuit/client/docker" + "github.com/gocircuit/circuit/kit/iomisc" "github.com/gocircuit/circuit/github.com/codegangsta/cli" ) @@ -53,6 +56,112 @@ func mkproc(x *cli.Context) { } } +func doRun(x *cli.Context, c *client.Client, cmd client.Cmd, path string, done chan bool) { + + w2, _ := parseGlob(path) + a2 := c.Walk(w2) + _runproc(x, c, a2, cmd, done) + +} + +func runproc(x *cli.Context) { + defer func() { + if r := recover(); r != nil { + fatalf("error, likely due to missing server or misspelled anchor: %v", r) + } + }() + c := dial(x) + args := x.Args() + + if len(args) != 1 && !x.Bool("all") { + fatalf("runproc needs an anchor argument or use the --all flag to to execute on every host in the circuit") + } + buf, _ := ioutil.ReadAll(os.Stdin) + var cmd client.Cmd + if err := json.Unmarshal(buf, &cmd); err != nil { + fatalf("command json not parsing: %v", err) + } + cmd.Scrub = true + + el := "/runproc/" + keygen(x) + + done := make(chan bool, 10) + if x.Bool("all") { + + w, _ := parseGlob("/") + + anchor := c.Walk(w) + + procs := 0 + + for _, a := range anchor.View() { + + procs++ + + go func(x *cli.Context, cmd client.Cmd, a string, done chan bool) { + + doRun(x, c, cmd, a, done) + + }(x, cmd, a.Path()+el, done) + + } + + for ; procs > 0 ; procs-- { + + select { + case <-done: + continue + } + + } + + } else { + + doRun(x, c, cmd, args[0]+el, done) + <-done + + } + +} + +func _runproc(x *cli.Context, c *client.Client, a client.Anchor, cmd client.Cmd, done chan bool) { + + p, err := a.MakeProc(cmd) + if err != nil { + fatalf("mkproc error: %s", err) + } + + stdin := p.Stdin() + if err := stdin.Close(); err != nil { + fatalf("error closing stdin: %v", err) + } + + if x.Bool("tag") { + + stdout := iomisc.PrefixReader(a.Addr() + " ", p.Stdout()) + stderr := iomisc.PrefixReader(a.Addr() + " ", p.Stderr()) + + stdoutScanner := bufio.NewScanner(stdout) + for stdoutScanner.Scan() { + fmt.Println(stdoutScanner.Text()) + } + + stderrScanner := bufio.NewScanner(stderr) + for stderrScanner.Scan() { + fmt.Println(stderrScanner.Text()) + } + + } else { + + io.Copy(os.Stdout, p.Stdout()) + io.Copy(os.Stderr, p.Stderr()) + + } + p.Wait() + done <- true + +} + func mkdkr(x *cli.Context) { defer func() { if r := recover(); r != nil { @@ -92,7 +201,9 @@ func sgnl(x *cli.Context) { fatalf("signal needs an anchor and a signal name arguments") } w, _ := parseGlob(args[1]) - u, ok := c.Walk(w).Get().(interface{Signal(string) error}) + u, ok := c.Walk(w).Get().(interface { + Signal(string) error + }) if !ok { fatalf("anchor is not a process or a docker container") }