From 4377a7afe481daeabdd32e3477611d6ecf83c5e7 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sat, 2 May 2015 01:44:45 -0700 Subject: [PATCH 01/16] runproc sample --- cmd/circuit/main.go | 11 +++++++++++ cmd/circuit/procdkr.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/cmd/circuit/main.go b/cmd/circuit/main.go index 5e0d93f..2f183df 100644 --- a/cmd/circuit/main.go +++ b/cmd/circuit/main.go @@ -219,6 +219,17 @@ func main() { cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."}, }, }, + { + Name: "runproc", + Usage: "Run a process element and return output on stdout", + Action: runproc, + Flags: []cli.Flag{ + cli.StringFlag{"dial, d", "", "circuit member to dial into"}, + cli.StringFlag{"discover", "228.8.8.8:8822", "Multicast address for peer server discovery"}, + cli.BoolFlag{"scrub", "scrub the process anchor automatically on exit"}, + cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."}, + }, + }, { Name: "signal", Usage: "Send a signal to a running process", diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index c5d93e7..33b3926 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -11,6 +11,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "io" "os" "github.com/gocircuit/circuit/client" @@ -53,6 +54,43 @@ func mkproc(x *cli.Context) { } } +func runproc(x *cli.Context) { + defer func() { + if r := recover(); r != nil { + fatalf("error, likely due to missing server or misspelled anchor: %v", r) + } + }() + c := dial(x) + args := x.Args() + if len(args) != 1 { + fatalf("runproc needs an anchor argument") + } + w, _ := parseGlob(args[0]) + buf, _ := ioutil.ReadAll(os.Stdin) + var cmd client.Cmd + if err := json.Unmarshal(buf, &cmd); err != nil { + fatalf("command json not parsing: %v", err) + } + if x.Bool("scrub") { + cmd.Scrub = true + } + a := c.Walk(w) + p, err := a.MakeProc(cmd) + if err != nil { + fatalf("mkproc error: %s", err) + } + // ps := p.Peek() + // if ps.Exit != nil { + // fatalf("%v", ps.Exit) + // } + q := p.Stdin() + if err := q.Close(); err != nil { + fatalf("error closing stdin: %v", err) + } + io.Copy(os.Stdout, p.Stdout()) + a.Scrub() +} + func mkdkr(x *cli.Context) { defer func() { if r := recover(); r != nil { From a9bb99535f5a15cf0efea74549e28acabbee84c1 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sat, 2 May 2015 15:02:52 -0700 Subject: [PATCH 02/16] added a flag to prefix output lines with the anchor so it can be grepped easily --- cmd/circuit/main.go | 1 + cmd/circuit/procdkr.go | 24 +++++++++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/cmd/circuit/main.go b/cmd/circuit/main.go index 2f183df..37dc48c 100644 --- a/cmd/circuit/main.go +++ b/cmd/circuit/main.go @@ -226,6 +226,7 @@ func main() { Flags: []cli.Flag{ cli.StringFlag{"dial, d", "", "circuit member to dial into"}, cli.StringFlag{"discover", "228.8.8.8:8822", "Multicast address for peer server discovery"}, + cli.BoolFlag{"anchors", "show anchor names prepended to each line of output"}, cli.BoolFlag{"scrub", "scrub the process anchor automatically on exit"}, cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."}, }, diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 33b3926..73a10a6 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -13,6 +13,7 @@ import ( "io/ioutil" "io" "os" + "bufio" "github.com/gocircuit/circuit/client" "github.com/gocircuit/circuit/client/docker" @@ -79,15 +80,28 @@ func runproc(x *cli.Context) { if err != nil { fatalf("mkproc error: %s", err) } - // ps := p.Peek() - // if ps.Exit != nil { - // fatalf("%v", ps.Exit) - // } + q := p.Stdin() if err := q.Close(); err != nil { fatalf("error closing stdin: %v", err) } - io.Copy(os.Stdout, p.Stdout()) + + if x.Bool("anchors") { + + scanner := bufio.NewScanner(p.Stdout()) + for scanner.Scan() { + fmt.Fprintf(os.Stdout, "%s %s\n", a.Path(), scanner.Text()) + } + if err := scanner.Err(); err != nil { + fmt.Fprintln(os.Stderr, "error prefixing the data", err) + } + + } else { + + io.Copy(os.Stdout, p.Stdout()) + + } + a.Scrub() } From 9c8903be2b585d0bb87131a4a47f5d0d1791f9cf Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sat, 2 May 2015 15:05:10 -0700 Subject: [PATCH 03/16] gofmt --- cmd/circuit/procdkr.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 73a10a6..085d670 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -8,12 +8,12 @@ package main import ( + "bufio" "encoding/json" "fmt" - "io/ioutil" "io" + "io/ioutil" "os" - "bufio" "github.com/gocircuit/circuit/client" "github.com/gocircuit/circuit/client/docker" @@ -89,15 +89,15 @@ func runproc(x *cli.Context) { if x.Bool("anchors") { scanner := bufio.NewScanner(p.Stdout()) - for scanner.Scan() { - fmt.Fprintf(os.Stdout, "%s %s\n", a.Path(), scanner.Text()) - } - if err := scanner.Err(); err != nil { - fmt.Fprintln(os.Stderr, "error prefixing the data", err) - } + for scanner.Scan() { + fmt.Fprintf(os.Stdout, "%s %s\n", a.Path(), scanner.Text()) + } + if err := scanner.Err(); err != nil { + fmt.Fprintln(os.Stderr, "error prefixing the data", err) + } } else { - + io.Copy(os.Stdout, p.Stdout()) } @@ -144,7 +144,9 @@ func sgnl(x *cli.Context) { fatalf("signal needs an anchor and a signal name arguments") } w, _ := parseGlob(args[1]) - u, ok := c.Walk(w).Get().(interface{Signal(string) error}) + u, ok := c.Walk(w).Get().(interface { + Signal(string) error + }) if !ok { fatalf("anchor is not a process or a docker container") } From 53ec5f4791ff6237f962c29532e6ddab4ea6d5cc Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sat, 2 May 2015 15:36:18 -0700 Subject: [PATCH 04/16] no longer require the user to specify a name for the process --- cmd/circuit/procdkr.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 085d670..8c23019 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -66,7 +66,6 @@ func runproc(x *cli.Context) { if len(args) != 1 { fatalf("runproc needs an anchor argument") } - w, _ := parseGlob(args[0]) buf, _ := ioutil.ReadAll(os.Stdin) var cmd client.Cmd if err := json.Unmarshal(buf, &cmd); err != nil { @@ -75,6 +74,7 @@ func runproc(x *cli.Context) { if x.Bool("scrub") { cmd.Scrub = true } + w, _ := parseGlob(args[0] + "/" + cmd.Path) a := c.Walk(w) p, err := a.MakeProc(cmd) if err != nil { From cb0ea064f78186836c7f317690434312450f266a Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sat, 2 May 2015 21:17:53 -0700 Subject: [PATCH 05/16] add a Name field to the input json so the user can specify the name of the element via the json --- client/client.go | 2 +- client/proc.go | 3 +++ cmd/circuit/glob_test.go | 2 +- cmd/circuit/procdkr.go | 10 +++++++++- element/proc/cmd.go | 1 + element/proc/proc.go | 4 ++++ 6 files changed, 19 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index fe58cd1..059ae8e 100644 --- a/client/client.go +++ b/client/client.go @@ -163,4 +163,4 @@ func (c *Client) Get() interface{} { } // Scrub is an Anchor interface method, not applicable to the root-level anchor. -func (c *Client) Scrub() {} \ No newline at end of file +func (c *Client) Scrub() {} diff --git a/client/proc.go b/client/proc.go index 70e9262..3f11aa9 100644 --- a/client/proc.go +++ b/client/proc.go @@ -32,6 +32,7 @@ type Cmd struct { // If Scrub is set, the process element will automatically be removed from its anchor // when the process exits. Scrub bool + Name string } func retypeProcStat(c proc.Cmd) Cmd { @@ -41,6 +42,7 @@ func retypeProcStat(c proc.Cmd) Cmd { Path: c.Path, Args: c.Args, Scrub: c.Scrub, + Name: c.Name, } } @@ -51,6 +53,7 @@ func (cmd Cmd) retype() proc.Cmd { Path: cmd.Path, Args: cmd.Args, Scrub: cmd.Scrub, + Name: cmd.Name, } } diff --git a/cmd/circuit/glob_test.go b/cmd/circuit/glob_test.go index a2bd8a8..8556751 100644 --- a/cmd/circuit/glob_test.go +++ b/cmd/circuit/glob_test.go @@ -15,4 +15,4 @@ import ( func TestGlob(t *testing.T) { walk, ellipses := parseGlob("/X/hola/petar/...") log.Printf("w=%v ell=%v", walk, ellipses) -} \ No newline at end of file +} diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 8c23019..30baf17 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -74,7 +74,15 @@ func runproc(x *cli.Context) { if x.Bool("scrub") { cmd.Scrub = true } - w, _ := parseGlob(args[0] + "/" + cmd.Path) + + el := string("") + if len(cmd.Name) > 0 { + el = cmd.Name + } else { + el = cmd.Path + } + + w, _ := parseGlob(args[0] + "/" + el) a := c.Walk(w) p, err := a.MakeProc(cmd) if err != nil { diff --git a/element/proc/cmd.go b/element/proc/cmd.go index 8d302d3..a41c015 100644 --- a/element/proc/cmd.go +++ b/element/proc/cmd.go @@ -18,6 +18,7 @@ type Cmd struct { Path string `json:"path"` Args []string `json:"args"` Scrub bool `json:"scrub"` + Name string `json:"name"` } func ParseCmd(src string) (*Cmd, error) { diff --git a/element/proc/proc.go b/element/proc/proc.go index 7d81cb7..a7e2d7d 100644 --- a/element/proc/proc.go +++ b/element/proc/proc.go @@ -45,6 +45,7 @@ type proc struct { sync.Mutex cmd exec.Cmd scrb bool + name string abr chan<- struct{} wait chan<- error exit error // exit set by waiter @@ -66,6 +67,7 @@ func MakeProc(cmd Cmd) Proc { p.cmd.cmd.Dir = cmd.Dir bin := strings.TrimSpace(cmd.Path) p.cmd.cmd.Path = bin + p.cmd.name = cmd.Name p.cmd.cmd.Args = append([]string{bin}, cmd.Args...) p.cmd.scrb = cmd.Scrub // exec @@ -148,6 +150,7 @@ func (p *proc) GetCmd() Cmd { Path: p.cmd.cmd.Path, Args: p.cmd.cmd.Args[1:], Scrub: p.cmd.scrb, + Name: p.cmd.name, } } @@ -177,6 +180,7 @@ func (p *proc) peek() Stat { Path: p.cmd.cmd.Path, Args: p.cmd.cmd.Args[1:], Scrub: p.cmd.scrb, + Name: p.cmd.name, }, Exit: p.cmd.exit, Phase: p.phase().String(), From ca48604f17fbffbab33fe0a5e6188dd8af3c4cc9 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sat, 2 May 2015 23:30:37 -0700 Subject: [PATCH 06/16] make the runproc io go async so its much faster --- cmd/circuit/procdkr.go | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 30baf17..4fb9ec3 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -14,6 +14,7 @@ import ( "io" "io/ioutil" "os" + "time" "github.com/gocircuit/circuit/client" "github.com/gocircuit/circuit/client/docker" @@ -21,6 +22,8 @@ import ( "github.com/gocircuit/circuit/github.com/codegangsta/cli" ) + +var timeout = time.Duration(33) // circuit mkproc /X1234/hola/charlie << EOF // { … } // EOF @@ -96,12 +99,30 @@ func runproc(x *cli.Context) { if x.Bool("anchors") { - scanner := bufio.NewScanner(p.Stdout()) - for scanner.Scan() { - fmt.Fprintf(os.Stdout, "%s %s\n", a.Path(), scanner.Text()) - } - if err := scanner.Err(); err != nil { - fmt.Fprintln(os.Stderr, "error prefixing the data", err) + done := make(chan bool) + + go func(a string, r io.Reader, w io.Writer, d chan bool) { + + scanner := bufio.NewScanner(r) + for scanner.Scan() { + fmt.Fprintf(w, "%s %s\n", a, scanner.Text()) + } + if err := scanner.Err(); err != nil { + d <- false + } + + d <- true + + }(a.Path(), p.Stdout(), os.Stdout, done) + + select { + case rv := <-done: + if !rv { + fmt.Fprintln(os.Stderr, "error prefixing the data") + } + //make this timeout come from the json input payload + case <-time.After(time.Second * timeout): + fmt.Fprintln(os.Stderr, "timeout waiting for data") } } else { From 0eb107bcd145b295b7a1611671f2a34808dd8b2f Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sun, 3 May 2015 08:35:38 -0700 Subject: [PATCH 07/16] change "anchors" flag to "tag" and remove scrub flag since its not needed --- cmd/circuit/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/circuit/main.go b/cmd/circuit/main.go index 37dc48c..276b292 100644 --- a/cmd/circuit/main.go +++ b/cmd/circuit/main.go @@ -226,8 +226,7 @@ func main() { Flags: []cli.Flag{ cli.StringFlag{"dial, d", "", "circuit member to dial into"}, cli.StringFlag{"discover", "228.8.8.8:8822", "Multicast address for peer server discovery"}, - cli.BoolFlag{"anchors", "show anchor names prepended to each line of output"}, - cli.BoolFlag{"scrub", "scrub the process anchor automatically on exit"}, + cli.BoolFlag{"tag", "tag each output line with the anchor names"}, cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."}, }, }, From a00646846c8dea9f8b92040b0457329cdb80c4b3 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sun, 3 May 2015 09:37:53 -0700 Subject: [PATCH 08/16] get runproc ready to get the anchors itself --- cmd/circuit/procdkr.go | 70 +++++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 4fb9ec3..e6555a0 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -16,10 +16,10 @@ import ( "os" "time" - "github.com/gocircuit/circuit/client" - "github.com/gocircuit/circuit/client/docker" + "github.com/mcqueenorama/circuit/client" + "github.com/mcqueenorama/circuit/client/docker" - "github.com/gocircuit/circuit/github.com/codegangsta/cli" + "github.com/mcqueenorama/circuit/github.com/codegangsta/cli" ) @@ -58,34 +58,10 @@ func mkproc(x *cli.Context) { } } -func runproc(x *cli.Context) { - defer func() { - if r := recover(); r != nil { - fatalf("error, likely due to missing server or misspelled anchor: %v", r) - } - }() - c := dial(x) - args := x.Args() - if len(args) != 1 { - fatalf("runproc needs an anchor argument") - } - buf, _ := ioutil.ReadAll(os.Stdin) - var cmd client.Cmd - if err := json.Unmarshal(buf, &cmd); err != nil { - fatalf("command json not parsing: %v", err) - } - if x.Bool("scrub") { - cmd.Scrub = true - } - - el := string("") - if len(cmd.Name) > 0 { - el = cmd.Name - } else { - el = cmd.Path - } +func _runproc (c *client.Client, cmd client.Cmd, dir string, tags bool) { - w, _ := parseGlob(args[0] + "/" + el) + // w, _ := parseGlob(args[0] + "/" + el) + w, _ := parseGlob(dir) a := c.Walk(w) p, err := a.MakeProc(cmd) if err != nil { @@ -97,7 +73,8 @@ func runproc(x *cli.Context) { fatalf("error closing stdin: %v", err) } - if x.Bool("anchors") { + // if x.Bool("tags") { + if tags { done := make(chan bool) @@ -134,6 +111,37 @@ func runproc(x *cli.Context) { a.Scrub() } +func runproc(x *cli.Context) { + defer func() { + if r := recover(); r != nil { + fatalf("error, likely due to missing server or misspelled anchor: %v", r) + } + }() + c := dial(x) + args := x.Args() + if len(args) != 1 { + fatalf("runproc needs an anchor argument") + } + buf, _ := ioutil.ReadAll(os.Stdin) + var cmd client.Cmd + if err := json.Unmarshal(buf, &cmd); err != nil { + fatalf("command json not parsing: %v", err) + } + if x.Bool("scrub") { + cmd.Scrub = true + } + + el := string("") + if len(cmd.Name) > 0 { + el = cmd.Name + } else { + el = cmd.Path + } + + _runproc(c, cmd, args[0] + "/" + el, x.Bool("tag")) + +} + func mkdkr(x *cli.Context) { defer func() { if r := recover(); r != nil { From b9b4e091aafe850f3f7d0f768aefcda5c5059de0 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sun, 3 May 2015 12:37:03 -0700 Subject: [PATCH 09/16] get the standalone runproc check going --- cmd/circuit/procdkr.go | 91 ++++++++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 38 deletions(-) diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index e6555a0..db0d9e8 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -22,8 +22,9 @@ import ( "github.com/mcqueenorama/circuit/github.com/codegangsta/cli" ) - +//make this timeout come from the json input payload var timeout = time.Duration(33) + // circuit mkproc /X1234/hola/charlie << EOF // { … } // EOF @@ -58,11 +59,58 @@ func mkproc(x *cli.Context) { } } -func _runproc (c *client.Client, cmd client.Cmd, dir string, tags bool) { +func runproc(x *cli.Context) { + defer func() { + if r := recover(); r != nil { + fatalf("error, likely due to missing server or misspelled anchor: %v", r) + } + }() + c := dial(x) + args := x.Args() + + if len(args) != 1 && !x.Bool("all") { + fatalf("runproc needs an anchor argument or use the --all flag to do the entire circuit") + } + buf, _ := ioutil.ReadAll(os.Stdin) + var cmd client.Cmd + if err := json.Unmarshal(buf, &cmd); err != nil { + fatalf("command json not parsing: %v", err) + } + if x.Bool("scrub") { + cmd.Scrub = true + } + + el := string("") + if len(cmd.Name) > 0 { + el = cmd.Name + } else { + el = cmd.Path + } + + if x.Bool("all") { + + w, _ := parseGlob("/") + + anchor := c.Walk(w) + + for _, a := range anchor.View() { + w2, _ := parseGlob(a.Path() + "/" + el) + a2 := c.Walk(w2) + _runproc(a2, cmd, x.Bool("tag")) + } + + } else { + + w, _ := parseGlob(args[0] + "/" + el) + a := c.Walk(w) + _runproc(a, cmd, x.Bool("tag")) + + } + +} + +func _runproc(a client.Anchor, cmd client.Cmd, tags bool) { - // w, _ := parseGlob(args[0] + "/" + el) - w, _ := parseGlob(dir) - a := c.Walk(w) p, err := a.MakeProc(cmd) if err != nil { fatalf("mkproc error: %s", err) @@ -73,7 +121,6 @@ func _runproc (c *client.Client, cmd client.Cmd, dir string, tags bool) { fatalf("error closing stdin: %v", err) } - // if x.Bool("tags") { if tags { done := make(chan bool) @@ -97,7 +144,6 @@ func _runproc (c *client.Client, cmd client.Cmd, dir string, tags bool) { if !rv { fmt.Fprintln(os.Stderr, "error prefixing the data") } - //make this timeout come from the json input payload case <-time.After(time.Second * timeout): fmt.Fprintln(os.Stderr, "timeout waiting for data") } @@ -111,37 +157,6 @@ func _runproc (c *client.Client, cmd client.Cmd, dir string, tags bool) { a.Scrub() } -func runproc(x *cli.Context) { - defer func() { - if r := recover(); r != nil { - fatalf("error, likely due to missing server or misspelled anchor: %v", r) - } - }() - c := dial(x) - args := x.Args() - if len(args) != 1 { - fatalf("runproc needs an anchor argument") - } - buf, _ := ioutil.ReadAll(os.Stdin) - var cmd client.Cmd - if err := json.Unmarshal(buf, &cmd); err != nil { - fatalf("command json not parsing: %v", err) - } - if x.Bool("scrub") { - cmd.Scrub = true - } - - el := string("") - if len(cmd.Name) > 0 { - el = cmd.Name - } else { - el = cmd.Path - } - - _runproc(c, cmd, args[0] + "/" + el, x.Bool("tag")) - -} - func mkdkr(x *cli.Context) { defer func() { if r := recover(); r != nil { From d8bd8effcf34daf35987c9a4677da34376a7f198 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sun, 3 May 2015 12:43:29 -0700 Subject: [PATCH 10/16] reset import paths for pull request --- cmd/circuit/main.go | 3 ++- cmd/circuit/procdkr.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/circuit/main.go b/cmd/circuit/main.go index 276b292..4897433 100644 --- a/cmd/circuit/main.go +++ b/cmd/circuit/main.go @@ -226,7 +226,8 @@ func main() { Flags: []cli.Flag{ cli.StringFlag{"dial, d", "", "circuit member to dial into"}, cli.StringFlag{"discover", "228.8.8.8:8822", "Multicast address for peer server discovery"}, - cli.BoolFlag{"tag", "tag each output line with the anchor names"}, + cli.BoolFlag{"tag", "tag each output line with the anchor names (hostnames)"}, + cli.BoolFlag{"all", "run the command across all hosts"}, cli.StringFlag{"hmac", "", "File containing HMAC credentials. Use RC4 encryption."}, }, }, diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index db0d9e8..bbc57a4 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -16,10 +16,10 @@ import ( "os" "time" - "github.com/mcqueenorama/circuit/client" - "github.com/mcqueenorama/circuit/client/docker" + "github.com/gocircuit/circuit/client" + "github.com/gocircuit/circuit/client/docker" - "github.com/mcqueenorama/circuit/github.com/codegangsta/cli" + "github.com/gocircuit/circuit/github.com/codegangsta/cli" ) //make this timeout come from the json input payload From 049ebd7306afe7c3750b751afa3e270f2a4d2c58 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Sun, 3 May 2015 19:07:48 -0700 Subject: [PATCH 11/16] make the --all commands go async for speed --- cmd/circuit/procdkr.go | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index bbc57a4..1e5edb8 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -59,6 +59,14 @@ func mkproc(x *cli.Context) { } } +func doRun(x *cli.Context, c *client.Client, cmd client.Cmd, path string) { + + w2, _ := parseGlob(path) + a2 := c.Walk(w2) + _runproc(a2, cmd, x.Bool("tag")) + +} + func runproc(x *cli.Context) { defer func() { if r := recover(); r != nil { @@ -93,17 +101,36 @@ func runproc(x *cli.Context) { anchor := c.Walk(w) + done := make(chan bool, 10) + procs := 0 + for _, a := range anchor.View() { - w2, _ := parseGlob(a.Path() + "/" + el) - a2 := c.Walk(w2) - _runproc(a2, cmd, x.Bool("tag")) + + procs++ + + go func(x *cli.Context, cmd client.Cmd, a string, done chan bool) { + + doRun(x, c, cmd, a) + done <- true + + }(x, cmd, a.Path()+"/"+el, done) + + } + + for ; procs > 0 ; procs-- { + + select { + case <-done: + continue + case <-time.After(time.Second * timeout): + fmt.Fprintln(os.Stderr, "timeout waiting for the command") + } + } } else { - w, _ := parseGlob(args[0] + "/" + el) - a := c.Walk(w) - _runproc(a, cmd, x.Bool("tag")) + doRun(x, c, cmd, args[0]+"/"+el) } From b5c5b7a674a2953ab9eddb0b666fc7a0c76d4d9f Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Mon, 25 May 2015 18:43:53 -0700 Subject: [PATCH 12/16] remove new Name field according to discussion and put all in runproc/$random using keygen --- client/proc.go | 2 -- cmd/circuit/hmac.go | 11 +++++++++-- cmd/circuit/main.go | 2 +- cmd/circuit/procdkr.go | 18 ++++++------------ element/proc/cmd.go | 1 - element/proc/proc.go | 4 ---- 6 files changed, 16 insertions(+), 22 deletions(-) diff --git a/client/proc.go b/client/proc.go index 3f11aa9..24bb65e 100644 --- a/client/proc.go +++ b/client/proc.go @@ -42,7 +42,6 @@ func retypeProcStat(c proc.Cmd) Cmd { Path: c.Path, Args: c.Args, Scrub: c.Scrub, - Name: c.Name, } } @@ -53,7 +52,6 @@ func (cmd Cmd) retype() proc.Cmd { Path: cmd.Path, Args: cmd.Args, Scrub: cmd.Scrub, - Name: cmd.Name, } } diff --git a/cmd/circuit/hmac.go b/cmd/circuit/hmac.go index fbe881c..59e0f0a 100644 --- a/cmd/circuit/hmac.go +++ b/cmd/circuit/hmac.go @@ -17,7 +17,7 @@ import ( "github.com/gocircuit/circuit/github.com/codegangsta/cli" ) -func keygen(c *cli.Context) { +func keygen(c *cli.Context) string { rand.Seed(time.Now().UnixNano()) seed := make([]byte, 32) for i, _ := range seed { @@ -25,5 +25,12 @@ func keygen(c *cli.Context) { } key := sha512.Sum512(seed) text := base64.StdEncoding.EncodeToString(key[:]) - fmt.Println(text) + + return text +} + +func keygenPrint(c *cli.Context) { + + fmt.Println(keygen(c)) + } diff --git a/cmd/circuit/main.go b/cmd/circuit/main.go index 4897433..87feada 100644 --- a/cmd/circuit/main.go +++ b/cmd/circuit/main.go @@ -37,7 +37,7 @@ func main() { { Name: "keygen", Usage: "Generate a new random HMAC key", - Action: keygen, + Action: keygenPrint, }, { Name: "ls", diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 1e5edb8..1fbdf1b 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -77,23 +77,16 @@ func runproc(x *cli.Context) { args := x.Args() if len(args) != 1 && !x.Bool("all") { - fatalf("runproc needs an anchor argument or use the --all flag to do the entire circuit") + fatalf("runproc needs an anchor argument or use the --all flag to to execute on every host in the circuit") } buf, _ := ioutil.ReadAll(os.Stdin) var cmd client.Cmd if err := json.Unmarshal(buf, &cmd); err != nil { fatalf("command json not parsing: %v", err) } - if x.Bool("scrub") { - cmd.Scrub = true - } + cmd.Scrub = true - el := string("") - if len(cmd.Name) > 0 { - el = cmd.Name - } else { - el = cmd.Path - } + el := "/runproc/" + keygen(x) if x.Bool("all") { @@ -113,7 +106,8 @@ func runproc(x *cli.Context) { doRun(x, c, cmd, a) done <- true - }(x, cmd, a.Path()+"/"+el, done) + // }(x, cmd, a.Path()+"/"+cmd.Path, done) + }(x, cmd, a.Path()+el, done) } @@ -130,7 +124,7 @@ func runproc(x *cli.Context) { } else { - doRun(x, c, cmd, args[0]+"/"+el) + doRun(x, c, cmd, args[0]+el) } diff --git a/element/proc/cmd.go b/element/proc/cmd.go index a41c015..8d302d3 100644 --- a/element/proc/cmd.go +++ b/element/proc/cmd.go @@ -18,7 +18,6 @@ type Cmd struct { Path string `json:"path"` Args []string `json:"args"` Scrub bool `json:"scrub"` - Name string `json:"name"` } func ParseCmd(src string) (*Cmd, error) { diff --git a/element/proc/proc.go b/element/proc/proc.go index a7e2d7d..7d81cb7 100644 --- a/element/proc/proc.go +++ b/element/proc/proc.go @@ -45,7 +45,6 @@ type proc struct { sync.Mutex cmd exec.Cmd scrb bool - name string abr chan<- struct{} wait chan<- error exit error // exit set by waiter @@ -67,7 +66,6 @@ func MakeProc(cmd Cmd) Proc { p.cmd.cmd.Dir = cmd.Dir bin := strings.TrimSpace(cmd.Path) p.cmd.cmd.Path = bin - p.cmd.name = cmd.Name p.cmd.cmd.Args = append([]string{bin}, cmd.Args...) p.cmd.scrb = cmd.Scrub // exec @@ -150,7 +148,6 @@ func (p *proc) GetCmd() Cmd { Path: p.cmd.cmd.Path, Args: p.cmd.cmd.Args[1:], Scrub: p.cmd.scrb, - Name: p.cmd.name, } } @@ -180,7 +177,6 @@ func (p *proc) peek() Stat { Path: p.cmd.cmd.Path, Args: p.cmd.cmd.Args[1:], Scrub: p.cmd.scrb, - Name: p.cmd.name, }, Exit: p.cmd.exit, Phase: p.phase().String(), From 6e1fa0568fefe6f14b8cf4279a47810be04328a0 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Mon, 25 May 2015 18:46:28 -0700 Subject: [PATCH 13/16] missed an instance of Name --- client/proc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/proc.go b/client/proc.go index 24bb65e..81267a4 100644 --- a/client/proc.go +++ b/client/proc.go @@ -32,7 +32,7 @@ type Cmd struct { // If Scrub is set, the process element will automatically be removed from its anchor // when the process exits. Scrub bool - Name string + } func retypeProcStat(c proc.Cmd) Cmd { From ba9f6745d58d7e12b26131f4f1a8d7ea7849c36b Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Mon, 25 May 2015 20:53:16 -0700 Subject: [PATCH 14/16] fix up the prefixed writing and remove the Scrub since they are autoscrubbed --- cmd/circuit/procdkr.go | 34 ++++++++-------------------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 1fbdf1b..9fb8975 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -18,6 +18,7 @@ import ( "github.com/gocircuit/circuit/client" "github.com/gocircuit/circuit/client/docker" + "github.com/gocircuit/circuit/kit/iomisc" "github.com/gocircuit/circuit/github.com/codegangsta/cli" ) @@ -63,7 +64,7 @@ func doRun(x *cli.Context, c *client.Client, cmd client.Cmd, path string) { w2, _ := parseGlob(path) a2 := c.Walk(w2) - _runproc(a2, cmd, x.Bool("tag")) + _runproc(x, c, a2, cmd) } @@ -130,7 +131,7 @@ func runproc(x *cli.Context) { } -func _runproc(a client.Anchor, cmd client.Cmd, tags bool) { +func _runproc(x *cli.Context, c *client.Client, a client.Anchor, cmd client.Cmd) { p, err := a.MakeProc(cmd) if err != nil { @@ -142,31 +143,13 @@ func _runproc(a client.Anchor, cmd client.Cmd, tags bool) { fatalf("error closing stdin: %v", err) } - if tags { + if x.Bool("tag") { - done := make(chan bool) + r := iomisc.PrefixReader(a.Addr() + " ", p.Stdout()) - go func(a string, r io.Reader, w io.Writer, d chan bool) { - - scanner := bufio.NewScanner(r) - for scanner.Scan() { - fmt.Fprintf(w, "%s %s\n", a, scanner.Text()) - } - if err := scanner.Err(); err != nil { - d <- false - } - - d <- true - - }(a.Path(), p.Stdout(), os.Stdout, done) - - select { - case rv := <-done: - if !rv { - fmt.Fprintln(os.Stderr, "error prefixing the data") - } - case <-time.After(time.Second * timeout): - fmt.Fprintln(os.Stderr, "timeout waiting for data") + scanner := bufio.NewScanner(r) + for scanner.Scan() { + fmt.Println(scanner.Text()) } } else { @@ -175,7 +158,6 @@ func _runproc(a client.Anchor, cmd client.Cmd, tags bool) { } - a.Scrub() } func mkdkr(x *cli.Context) { From 7338ad0aa25a08deaa080eecd74d9c005e149311 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Tue, 26 May 2015 08:08:42 -0700 Subject: [PATCH 15/16] redo the waiting use p.Wait() --- cmd/circuit/procdkr.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 9fb8975..684126e 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -14,7 +14,6 @@ import ( "io" "io/ioutil" "os" - "time" "github.com/gocircuit/circuit/client" "github.com/gocircuit/circuit/client/docker" @@ -23,9 +22,6 @@ import ( "github.com/gocircuit/circuit/github.com/codegangsta/cli" ) -//make this timeout come from the json input payload -var timeout = time.Duration(33) - // circuit mkproc /X1234/hola/charlie << EOF // { … } // EOF @@ -60,11 +56,11 @@ func mkproc(x *cli.Context) { } } -func doRun(x *cli.Context, c *client.Client, cmd client.Cmd, path string) { +func doRun(x *cli.Context, c *client.Client, cmd client.Cmd, path string, done chan bool) { w2, _ := parseGlob(path) a2 := c.Walk(w2) - _runproc(x, c, a2, cmd) + _runproc(x, c, a2, cmd, done) } @@ -89,13 +85,13 @@ func runproc(x *cli.Context) { el := "/runproc/" + keygen(x) + done := make(chan bool, 10) if x.Bool("all") { w, _ := parseGlob("/") anchor := c.Walk(w) - done := make(chan bool, 10) procs := 0 for _, a := range anchor.View() { @@ -104,10 +100,8 @@ func runproc(x *cli.Context) { go func(x *cli.Context, cmd client.Cmd, a string, done chan bool) { - doRun(x, c, cmd, a) - done <- true + doRun(x, c, cmd, a, done) - // }(x, cmd, a.Path()+"/"+cmd.Path, done) }(x, cmd, a.Path()+el, done) } @@ -117,21 +111,19 @@ func runproc(x *cli.Context) { select { case <-done: continue - case <-time.After(time.Second * timeout): - fmt.Fprintln(os.Stderr, "timeout waiting for the command") } } } else { - doRun(x, c, cmd, args[0]+el) + doRun(x, c, cmd, args[0]+el, done) } } -func _runproc(x *cli.Context, c *client.Client, a client.Anchor, cmd client.Cmd) { +func _runproc(x *cli.Context, c *client.Client, a client.Anchor, cmd client.Cmd, done chan bool) { p, err := a.MakeProc(cmd) if err != nil { @@ -157,6 +149,8 @@ func _runproc(x *cli.Context, c *client.Client, a client.Anchor, cmd client.Cmd) io.Copy(os.Stdout, p.Stdout()) } + p.Wait() + done <- true } From af9af766505f8d66c67298bc8e4c2535d07fdb56 Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Tue, 26 May 2015 08:38:43 -0700 Subject: [PATCH 16/16] get stderr of the process too --- cmd/circuit/procdkr.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/cmd/circuit/procdkr.go b/cmd/circuit/procdkr.go index 684126e..4e9350c 100644 --- a/cmd/circuit/procdkr.go +++ b/cmd/circuit/procdkr.go @@ -118,6 +118,7 @@ func runproc(x *cli.Context) { } else { doRun(x, c, cmd, args[0]+el, done) + <-done } @@ -130,23 +131,30 @@ func _runproc(x *cli.Context, c *client.Client, a client.Anchor, cmd client.Cmd, fatalf("mkproc error: %s", err) } - q := p.Stdin() - if err := q.Close(); err != nil { + stdin := p.Stdin() + if err := stdin.Close(); err != nil { fatalf("error closing stdin: %v", err) } if x.Bool("tag") { - r := iomisc.PrefixReader(a.Addr() + " ", p.Stdout()) + stdout := iomisc.PrefixReader(a.Addr() + " ", p.Stdout()) + stderr := iomisc.PrefixReader(a.Addr() + " ", p.Stderr()) - scanner := bufio.NewScanner(r) - for scanner.Scan() { - fmt.Println(scanner.Text()) + stdoutScanner := bufio.NewScanner(stdout) + for stdoutScanner.Scan() { + fmt.Println(stdoutScanner.Text()) + } + + stderrScanner := bufio.NewScanner(stderr) + for stderrScanner.Scan() { + fmt.Println(stderrScanner.Text()) } } else { io.Copy(os.Stdout, p.Stdout()) + io.Copy(os.Stderr, p.Stderr()) } p.Wait()