Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit d13d069

Browse files
author
David Chung
authored
Plugin activation (#356)
Signed-off-by: David Chung <[email protected]>
1 parent fd5894e commit d13d069

File tree

13 files changed

+756
-6
lines changed

13 files changed

+756
-6
lines changed

cmd/cli/plugin.go

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@ package main
22

33
import (
44
"fmt"
5+
"sync"
6+
57
"github.com/docker/infrakit/pkg/discovery"
8+
"github.com/docker/infrakit/pkg/launch"
9+
"github.com/docker/infrakit/pkg/launch/os"
10+
"github.com/docker/infrakit/pkg/template"
611
"github.com/spf13/cobra"
712
)
813

@@ -13,7 +18,7 @@ func pluginCommand(plugins func() discovery.Plugins) *cobra.Command {
1318
Short: "Manage plugins",
1419
}
1520

16-
ls := cobra.Command{
21+
ls := &cobra.Command{
1722
Use: "ls",
1823
Short: "List available plugins",
1924
}
@@ -34,7 +39,94 @@ func pluginCommand(plugins func() discovery.Plugins) *cobra.Command {
3439
return nil
3540
}
3641

37-
cmd.AddCommand(&ls)
42+
start := &cobra.Command{
43+
Use: "start",
44+
Short: "Start named plugins. Args are a list of plugin names",
45+
}
46+
47+
configURL := start.Flags().String("config-url", "", "URL for the startup configs")
48+
osExec := start.Flags().Bool("os", false, "True to use os plugin binaries")
49+
doWait := start.Flags().BoolP("wait", "w", false, "True to wait in the foreground; Ctrl-C to exit")
50+
51+
start.RunE = func(c *cobra.Command, args []string) error {
52+
53+
configTemplate, err := template.NewTemplate(*configURL, template.Options{
54+
SocketDir: discovery.Dir(),
55+
})
56+
if err != nil {
57+
return err
58+
}
59+
60+
view, err := configTemplate.Render(nil)
61+
if err != nil {
62+
return err
63+
}
64+
65+
configs := launch.Config([]byte(view))
66+
67+
parsedRules := []launch.Rule{}
68+
err = configs.Unmarshal(&parsedRules)
69+
if err != nil {
70+
return err
71+
}
72+
73+
monitors := []*launch.Monitor{}
74+
75+
if *osExec {
76+
exec, err := os.NewLauncher()
77+
if err != nil {
78+
return err
79+
}
80+
monitors = append(monitors, launch.NewMonitor(exec, parsedRules))
81+
}
82+
83+
input := []chan<- launch.StartPlugin{}
84+
for _, m := range monitors {
85+
ch, err := m.Start()
86+
if err != nil {
87+
return err
88+
}
89+
input = append(input, ch)
90+
}
91+
92+
var wait sync.WaitGroup
93+
94+
if *doWait {
95+
wait.Add(1)
96+
}
97+
98+
// now start all the plugins
99+
for _, plugin := range args {
100+
fmt.Println("Starting up", plugin)
101+
102+
wait.Add(1)
103+
104+
for _, ch := range input {
105+
106+
name := plugin
107+
ch <- launch.StartPlugin{
108+
Plugin: name,
109+
Started: func(config *launch.Config) {
110+
fmt.Println(name, "started.")
111+
wait.Done()
112+
},
113+
Error: func(config *launch.Config, err error) {
114+
fmt.Println("Error starting", name, "err=", err)
115+
wait.Done()
116+
},
117+
}
118+
}
119+
}
120+
121+
wait.Wait() // wait for everyone to complete
122+
123+
for _, monitor := range monitors {
124+
monitor.Stop()
125+
}
126+
return nil
127+
}
128+
129+
cmd.AddCommand(ls, start)
38130

39131
return cmd
40132
}

pkg/launch/config.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package launch
2+
3+
import (
4+
"encoding/json"
5+
)
6+
7+
// Config is the raw configuration on how to launch the plugin.
8+
type Config json.RawMessage
9+
10+
// Unmarshal decodes the raw config container in this object to the typed struct.
11+
func (c *Config) Unmarshal(typed interface{}) error {
12+
if c == nil || len([]byte(*c)) == 0 {
13+
return nil // no effect on typed
14+
}
15+
return json.Unmarshal([]byte(*c), typed)
16+
}
17+
18+
// Marshal populates this raw message with a decoded form of the input struct.
19+
func (c *Config) Marshal(typed interface{}) error {
20+
buff, err := json.MarshalIndent(typed, "", " ")
21+
if err != nil {
22+
return err
23+
}
24+
*c = Config(json.RawMessage(buff))
25+
return nil
26+
}
27+
28+
// String returns the string representation.
29+
func (c *Config) String() string {
30+
return string([]byte(*c))
31+
}
32+
33+
// MarshalJSON implements the json Marshaler interface
34+
func (c *Config) MarshalJSON() ([]byte, error) {
35+
if c == nil {
36+
return nil, nil
37+
}
38+
return []byte(*c), nil
39+
}
40+
41+
// UnmarshalJSON implements the json Unmarshaler interface
42+
func (c *Config) UnmarshalJSON(data []byte) error {
43+
*c = Config(json.RawMessage(data))
44+
return nil
45+
}

pkg/launch/exec.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package launch
2+
3+
// Exec is a service that is able to start plugins based on different
4+
// mechanisms from running local binary to pulling and running docker containers or engine plugins
5+
type Exec interface {
6+
7+
// Name returns the name of the launcher. This is used to identify
8+
// which launcher to use in configurations or command line flags
9+
Name() string
10+
11+
// Exec starts the plugin given the name of the plugin and
12+
// the command and args to start it.
13+
// This can be an async process but the launcher will poll for the running
14+
// status of the plugin.
15+
// The client can receive and block on the returned channel
16+
// and add optional timeout in its own select statement.
17+
Exec(name string, config *Config) (<-chan error, error)
18+
}

pkg/launch/monitor.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package launch
2+
3+
import (
4+
"errors"
5+
"sync"
6+
7+
log "github.com/Sirupsen/logrus"
8+
)
9+
10+
var errNoConfig = errors.New("no-counfig")
11+
12+
// Rule provides the instructions on starting the plugin
13+
type Rule struct {
14+
15+
// Plugin is the name of the plugin
16+
Plugin string
17+
18+
// Exec is the name of the exec to use to start the plugin
19+
Exec string
20+
21+
// Launch is encoded form of the rule on how to start/exec the process
22+
Launch *Config
23+
}
24+
25+
// Monitor runs continuously receiving requests to start a plugin.
26+
// Monitor uses a launcher to actually start the process of the plugin.
27+
type Monitor struct {
28+
exec Exec
29+
rules map[string]Rule
30+
startChan <-chan StartPlugin
31+
inputChan chan<- StartPlugin
32+
stop chan interface{}
33+
lock sync.Mutex
34+
}
35+
36+
// NewMonitor returns a monitor that continuously watches for input
37+
// requests and launches the process for the plugin, if not already running.
38+
func NewMonitor(l Exec, rules []Rule) *Monitor {
39+
m := map[string]Rule{}
40+
// index by name of plugin
41+
for _, r := range rules {
42+
if r.Exec == l.Name() {
43+
m[r.Plugin] = r
44+
}
45+
}
46+
return &Monitor{
47+
exec: l,
48+
rules: m,
49+
}
50+
}
51+
52+
// StartPlugin is the command to start a plugin
53+
type StartPlugin struct {
54+
Plugin string
55+
Started func(*Config)
56+
Error func(*Config, error)
57+
}
58+
59+
func (s StartPlugin) reportError(config *Config, e error) {
60+
if s.Error != nil {
61+
go s.Error(config, e)
62+
}
63+
}
64+
65+
func (s StartPlugin) reportSuccess(config *Config) {
66+
if s.Started != nil {
67+
go s.Started(config)
68+
}
69+
}
70+
71+
// Start starts the monitor and returns a channel for sending
72+
// requests to launch plugins. Closing the channel effectively stops
73+
// the monitor loop.
74+
func (m *Monitor) Start() (chan<- StartPlugin, error) {
75+
m.lock.Lock()
76+
defer m.lock.Unlock()
77+
if m.startChan != nil {
78+
return m.inputChan, nil
79+
}
80+
81+
ch := make(chan StartPlugin)
82+
m.startChan = ch
83+
m.inputChan = ch
84+
85+
go func() {
86+
loop:
87+
for {
88+
req, open := <-m.startChan
89+
if !open {
90+
log.Infoln("Plugin activation input closed. Stopping.")
91+
m.inputChan = nil
92+
return
93+
}
94+
95+
r, has := m.rules[req.Plugin]
96+
if !has {
97+
log.Warningln("no plugin:", req)
98+
req.reportError(r.Launch, errNoConfig)
99+
continue loop
100+
}
101+
102+
configCopy := &Config{}
103+
if r.Launch != nil {
104+
*configCopy = *r.Launch
105+
}
106+
107+
block, err := m.exec.Exec(r.Plugin, configCopy)
108+
if err != nil {
109+
req.reportError(configCopy, err)
110+
continue loop
111+
}
112+
113+
log.Infoln("Waiting for", r.Plugin, "to start:", configCopy.String())
114+
err = <-block
115+
if err != nil {
116+
req.reportError(configCopy, err)
117+
continue loop
118+
}
119+
120+
req.reportSuccess(configCopy)
121+
}
122+
}()
123+
124+
return m.inputChan, nil
125+
}
126+
127+
// Stop stops the monitor
128+
func (m *Monitor) Stop() {
129+
m.lock.Lock()
130+
defer m.lock.Unlock()
131+
if m.inputChan != nil {
132+
close(m.inputChan)
133+
}
134+
}

0 commit comments

Comments
 (0)