Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit ce06e23

Browse files
author
David Chung
authored
instance tracker (#494)
Signed-off-by: David Chung <[email protected]>
1 parent 8ba08a1 commit ce06e23

File tree

6 files changed

+486
-82
lines changed

6 files changed

+486
-82
lines changed

cmd/cli/util/fileserver.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package util
2+
3+
import (
4+
"net/http"
5+
"os"
6+
7+
"github.com/docker/infrakit/pkg/discovery"
8+
"github.com/spf13/cobra"
9+
)
10+
11+
func fileServerCommand(plugins func() discovery.Plugins) *cobra.Command {
12+
cmd := &cobra.Command{
13+
Use: "fileserver <path>",
14+
Short: "Fileserver over http",
15+
}
16+
17+
listen := cmd.Flags().StringP("listen", "l", ":8080", "Listening port")
18+
19+
cmd.RunE = func(c *cobra.Command, args []string) error {
20+
21+
if len(args) != 1 {
22+
c.Usage()
23+
os.Exit(-1)
24+
}
25+
26+
logger.Info("Starting file server", "listen", *listen)
27+
28+
rootFS := args[0]
29+
handler := http.FileServer(http.Dir(rootFS))
30+
return http.ListenAndServe(*listen, handler)
31+
}
32+
33+
return cmd
34+
}

cmd/cli/util/mux.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package util
2+
3+
import (
4+
"time"
5+
6+
"github.com/docker/infrakit/pkg/discovery"
7+
"github.com/docker/infrakit/pkg/rpc/mux"
8+
"github.com/spf13/cobra"
9+
)
10+
11+
func muxCommand(plugins func() discovery.Plugins) *cobra.Command {
12+
cmd := &cobra.Command{
13+
Use: "mux",
14+
Short: "API mux service",
15+
}
16+
17+
// http://www.speedguide.net/port.php?port=24864 - unassigned by IANA
18+
listen := cmd.Flags().StringP("listen", "l", ":24864", "Listening port")
19+
autoStop := cmd.Flags().BoolP("auto-stop", "a", false, "True to stop when no plugins are running")
20+
interval := cmd.Flags().DurationP("scan", "s", 1*time.Minute, "Scan interval to check for plugins")
21+
22+
cmd.RunE = func(c *cobra.Command, args []string) error {
23+
logger.Info("Starting mux server", "listen", *listen)
24+
server, err := mux.NewServer(*listen, plugins)
25+
if err != nil {
26+
return err
27+
}
28+
defer server.Stop()
29+
30+
block := make(chan struct{})
31+
// If the sockets are gone, then we can safely exit.
32+
go func() {
33+
checkNow := time.Tick(*interval)
34+
for {
35+
select {
36+
case <-server.Wait():
37+
logger.Info("server stopped")
38+
close(block)
39+
return
40+
41+
case <-checkNow:
42+
if m, err := plugins().List(); err == nil {
43+
if len(m) == 0 && *autoStop {
44+
logger.Info("scan found no plugins.")
45+
close(block)
46+
return
47+
}
48+
}
49+
50+
}
51+
}
52+
}()
53+
54+
<-block
55+
56+
return nil
57+
}
58+
59+
return cmd
60+
}

cmd/cli/util/track.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package util
2+
3+
import (
4+
"os"
5+
"strings"
6+
"time"
7+
8+
"github.com/docker/infrakit/pkg/cli"
9+
"github.com/docker/infrakit/pkg/discovery"
10+
"github.com/docker/infrakit/pkg/plugin"
11+
"github.com/docker/infrakit/pkg/plugin/event/instance"
12+
event_rpc "github.com/docker/infrakit/pkg/rpc/event"
13+
instance_rpc "github.com/docker/infrakit/pkg/rpc/instance"
14+
"github.com/docker/infrakit/pkg/spi/event"
15+
"github.com/spf13/cobra"
16+
)
17+
18+
func trackCommand(plugins func() discovery.Plugins) *cobra.Command {
19+
cmd := &cobra.Command{
20+
Use: "track",
21+
Short: "Track instances",
22+
}
23+
24+
name := cmd.Flags().String("name", "", "Name to use as name of this plugin")
25+
targets := cmd.Flags().StringSliceP("instance", "n", []string{}, "Instance plugins to track")
26+
poll := cmd.Flags().DurationP("poll", "i", 3*time.Second, "Polling interval")
27+
flagTags := cmd.Flags().StringSliceP("tag", "t", []string{}, "Tags to filter instance by")
28+
29+
cmd.RunE = func(c *cobra.Command, args []string) error {
30+
31+
if len(args) != 0 {
32+
cmd.Usage()
33+
os.Exit(-1)
34+
}
35+
36+
tags := map[string]string{}
37+
38+
for _, tag := range *flagTags {
39+
kv := strings.SplitN(tag, "=", 2)
40+
if len(kv) != 2 {
41+
logger.Warn("bad format tag", "input", tag)
42+
continue
43+
}
44+
key := strings.TrimSpace(kv[0])
45+
val := strings.TrimSpace(kv[1])
46+
if key != "" && val != "" {
47+
tags[key] = val
48+
}
49+
}
50+
51+
trackers := map[string]event.Plugin{}
52+
53+
for _, target := range *targets {
54+
55+
endpoint, err := plugins().Find(plugin.Name(target))
56+
if err != nil {
57+
return err
58+
}
59+
60+
if p, err := instance_rpc.NewClient(plugin.Name(target), endpoint.Address); err == nil {
61+
trackers[target] = instance.NewTracker(target, p, time.Tick(*poll), tags)
62+
} else {
63+
return err
64+
}
65+
}
66+
67+
cli.RunPlugin(*name,
68+
// As event plugin
69+
event_rpc.PluginServerWithTypes(trackers),
70+
)
71+
return nil
72+
}
73+
74+
return cmd
75+
}

cmd/cli/util/util.go

Lines changed: 1 addition & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
package util
22

33
import (
4-
"net/http"
5-
"os"
6-
"time"
7-
84
"github.com/docker/infrakit/cmd/cli/base"
95
"github.com/docker/infrakit/pkg/discovery"
106
"github.com/docker/infrakit/pkg/log"
11-
"github.com/docker/infrakit/pkg/rpc/mux"
127
"github.com/spf13/cobra"
138
)
149

@@ -18,82 +13,6 @@ func init() {
1813
base.Register(Command)
1914
}
2015

21-
func fileServerCommand(plugins func() discovery.Plugins) *cobra.Command {
22-
cmd := &cobra.Command{
23-
Use: "fileserver <path>",
24-
Short: "Fileserver over http",
25-
}
26-
27-
listen := cmd.Flags().StringP("listen", "l", ":8080", "Listening port")
28-
29-
cmd.RunE = func(c *cobra.Command, args []string) error {
30-
31-
if len(args) != 1 {
32-
c.Usage()
33-
os.Exit(-1)
34-
}
35-
36-
logger.Info("Starting file server", "listen", *listen)
37-
38-
rootFS := args[0]
39-
handler := http.FileServer(http.Dir(rootFS))
40-
return http.ListenAndServe(*listen, handler)
41-
}
42-
43-
return cmd
44-
}
45-
46-
func muxCommand(plugins func() discovery.Plugins) *cobra.Command {
47-
cmd := &cobra.Command{
48-
Use: "mux",
49-
Short: "API mux service",
50-
}
51-
52-
// http://www.speedguide.net/port.php?port=24864 - unassigned by IANA
53-
listen := cmd.Flags().StringP("listen", "l", ":24864", "Listening port")
54-
autoStop := cmd.Flags().BoolP("auto-stop", "a", false, "True to stop when no plugins are running")
55-
interval := cmd.Flags().DurationP("scan", "s", 1*time.Minute, "Scan interval to check for plugins")
56-
57-
cmd.RunE = func(c *cobra.Command, args []string) error {
58-
logger.Info("Starting mux server", "listen", *listen)
59-
server, err := mux.NewServer(*listen, plugins)
60-
if err != nil {
61-
return err
62-
}
63-
defer server.Stop()
64-
65-
block := make(chan struct{})
66-
// If the sockets are gone, then we can safely exit.
67-
go func() {
68-
checkNow := time.Tick(*interval)
69-
for {
70-
select {
71-
case <-server.Wait():
72-
logger.Info("server stopped")
73-
close(block)
74-
return
75-
76-
case <-checkNow:
77-
if m, err := plugins().List(); err == nil {
78-
if len(m) == 0 && *autoStop {
79-
logger.Info("scan found no plugins.")
80-
close(block)
81-
return
82-
}
83-
}
84-
85-
}
86-
}
87-
}()
88-
89-
<-block
90-
91-
return nil
92-
}
93-
94-
return cmd
95-
}
96-
9716
// Command is the head of this module
9817
func Command(plugins func() discovery.Plugins) *cobra.Command {
9918

@@ -102,7 +21,7 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
10221
Short: "Utilties",
10322
}
10423

105-
util.AddCommand(muxCommand(plugins), fileServerCommand(plugins))
24+
util.AddCommand(muxCommand(plugins), fileServerCommand(plugins), trackCommand(plugins))
10625

10726
return util
10827
}

0 commit comments

Comments
 (0)