Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 93f6892

Browse files
author
David Chung
authored
Support multiple groups in a single specification (#368)
Signed-off-by: David Chung <[email protected]>
1 parent 61741dd commit 93f6892

File tree

23 files changed

+433
-104
lines changed

23 files changed

+433
-104
lines changed

cmd/cli/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func main() {
4141
cmd.AddCommand(cli.VersionCommand(), cli.InfoCommand(f))
4242

4343
cmd.AddCommand(templateCommand(f))
44+
cmd.AddCommand(managerCommand(f))
4445
cmd.AddCommand(pluginCommand(f), instancePluginCommand(f), groupPluginCommand(f), flavorPluginCommand(f))
4546

4647
err := cmd.Execute()

cmd/cli/manager.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
log "github.com/Sirupsen/logrus"
8+
"github.com/docker/infrakit/pkg/discovery"
9+
"github.com/docker/infrakit/pkg/manager"
10+
"github.com/docker/infrakit/pkg/plugin"
11+
"github.com/docker/infrakit/pkg/rpc/client"
12+
group_plugin "github.com/docker/infrakit/pkg/rpc/group"
13+
manager_rpc "github.com/docker/infrakit/pkg/rpc/manager"
14+
"github.com/docker/infrakit/pkg/spi/group"
15+
"github.com/docker/infrakit/pkg/template"
16+
"github.com/docker/infrakit/pkg/types"
17+
"github.com/spf13/cobra"
18+
)
19+
20+
func managerCommand(plugins func() discovery.Plugins) *cobra.Command {
21+
22+
var groupPlugin group.Plugin
23+
var groupPluginName string
24+
25+
cmd := &cobra.Command{
26+
Use: "manager",
27+
Short: "Access the manager",
28+
}
29+
cmd.PersistentPreRunE = func(c *cobra.Command, args []string) error {
30+
31+
// Scan for a manager
32+
pm, err := plugins().List()
33+
if err != nil {
34+
return err
35+
}
36+
37+
for name, endpoint := range pm {
38+
39+
rpcClient, err := client.New(endpoint.Address, manager.InterfaceSpec)
40+
if err == nil {
41+
42+
m := manager_rpc.Adapt(rpcClient)
43+
44+
isLeader, err := m.IsLeader()
45+
if err != nil {
46+
return err
47+
}
48+
49+
log.Infoln("Found manager", name, "is leader = ", isLeader)
50+
if isLeader {
51+
52+
groupPlugin = group_plugin.Adapt(rpcClient)
53+
groupPluginName = name
54+
55+
log.Infoln("Found manager as", name, "at", endpoint.Address)
56+
57+
break
58+
}
59+
}
60+
}
61+
return nil
62+
}
63+
64+
commit := cobra.Command{
65+
Use: "commit <template_URL>",
66+
Short: "commit a multi-group configuration, as specified by the URL",
67+
}
68+
pretend := commit.Flags().Bool("pretend", false, "Don't actually commit, only explain the commit")
69+
commit.RunE = func(cmd *cobra.Command, args []string) error {
70+
assertNotNil("no plugin", groupPlugin)
71+
72+
if len(args) != 1 {
73+
cmd.Usage()
74+
os.Exit(1)
75+
}
76+
77+
templateURL := args[0]
78+
79+
log.Infof("Using %v for reading template\n", templateURL)
80+
engine, err := template.NewTemplate(templateURL, template.Options{
81+
SocketDir: discovery.Dir(),
82+
})
83+
if err != nil {
84+
return err
85+
}
86+
view, err := engine.Render(nil)
87+
if err != nil {
88+
return err
89+
}
90+
91+
log.Debugln(view)
92+
93+
// Treat this as an Any and then convert
94+
any := types.AnyString(view)
95+
96+
groups := []plugin.Spec{}
97+
err = any.Decode(&groups)
98+
if err != nil {
99+
log.Warningln("Error parsing the template for plugin specs.")
100+
return err
101+
}
102+
103+
// Check the list of plugins
104+
for _, gp := range groups {
105+
endpoint, err := plugins().Find(gp.Plugin)
106+
if err != nil {
107+
return err
108+
}
109+
110+
// unmarshal the group spec
111+
spec := group.Spec{}
112+
if gp.Properties != nil {
113+
err = gp.Properties.Decode(&spec)
114+
if err != nil {
115+
return err
116+
}
117+
}
118+
119+
// TODO(chungers) -- we need to enforce and confirm the type of this.
120+
// Right now we assume the RPC endpoint is indeed a group.
121+
target, err := group_plugin.NewClient(endpoint.Address)
122+
if err != nil {
123+
return err
124+
}
125+
126+
plan, err := target.CommitGroup(spec, *pretend)
127+
if err != nil {
128+
return err
129+
}
130+
131+
fmt.Println("Group", spec.ID, "with plugin", gp.Plugin, "plan:", plan)
132+
}
133+
134+
return nil
135+
}
136+
137+
inspect := cobra.Command{
138+
Use: "inspect",
139+
Short: "inspect returns the plugin configurations known by the manager",
140+
}
141+
inspect.RunE = func(cmd *cobra.Command, args []string) error {
142+
assertNotNil("no plugin", groupPlugin)
143+
144+
if len(args) != 0 {
145+
cmd.Usage()
146+
os.Exit(1)
147+
}
148+
149+
specs, err := groupPlugin.InspectGroups()
150+
if err != nil {
151+
return err
152+
}
153+
154+
// the format is pluing.Spec
155+
out := []plugin.Spec{}
156+
for _, spec := range specs {
157+
158+
any, err := types.AnyValue(spec)
159+
if err != nil {
160+
return err
161+
}
162+
163+
out = append(out, plugin.Spec{
164+
Plugin: plugin.Name(groupPluginName),
165+
Properties: any,
166+
})
167+
}
168+
169+
view, err := types.AnyValue(out)
170+
if err != nil {
171+
return err
172+
}
173+
fmt.Println(view.String())
174+
175+
return nil
176+
}
177+
178+
cmd.AddCommand(&commit, &inspect)
179+
180+
return cmd
181+
}

cmd/cli/plugin.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,28 @@ func pluginCommand(plugins func() discovery.Plugins) *cobra.Command {
138138
Short: "Stop named plugins. Args are a list of plugin names. This assumes plugins are local processes and not containers managed by another daemon, like Docker or runc.",
139139
}
140140

141+
all := stop.Flags().Bool("all", false, "True to stop all running plugins")
141142
stop.RunE = func(c *cobra.Command, args []string) error {
142143

143144
allPlugins, err := plugins().List()
144145
if err != nil {
145146
return err
146147
}
147148

148-
for _, n := range args {
149+
targets := args
150+
151+
if *all {
152+
names := []string{}
153+
for n := range allPlugins {
154+
names = append(names, n)
155+
}
156+
targets = names
157+
}
158+
159+
for _, n := range targets {
149160

150161
p, has := allPlugins[n]
151162
if !has {
152-
log.Warningf("Plugin %s not running", n)
153163
continue
154164
}
155165

cmd/manager/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/docker/infrakit/pkg/leader"
1111
"github.com/docker/infrakit/pkg/manager"
1212
group_rpc "github.com/docker/infrakit/pkg/rpc/group"
13+
manager_rpc "github.com/docker/infrakit/pkg/rpc/manager"
1314
"github.com/docker/infrakit/pkg/store"
1415
"github.com/docker/infrakit/pkg/util/docker"
1516
"github.com/spf13/cobra"
@@ -77,7 +78,7 @@ func runMain(cfg config) error {
7778
return err
7879
}
7980

80-
cli.RunPlugin(cfg.id, group_rpc.PluginServer(mgr))
81+
cli.RunPlugin(cfg.id, group_rpc.PluginServer(mgr), manager_rpc.PluginServer(mgr))
8182

8283
mgr.Stop()
8384
log.Infoln("Manager stopped")

pkg/cli/serverutil.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ func EnsureDirExists(dir string) {
1818

1919
// RunPlugin runs a plugin server, advertising with the provided name for discovery.
2020
// The plugin should conform to the rpc call convention as implemented in the rpc package.
21-
func RunPlugin(name string, plugin server.VersionedInterface) {
21+
func RunPlugin(name string, plugin server.VersionedInterface, more ...server.VersionedInterface) {
2222

2323
dir := discovery.Dir()
2424
EnsureDirExists(dir)
2525

2626
socketPath := path.Join(dir, name)
2727
pidPath := path.Join(dir, name+".pid")
2828

29-
stoppable, err := server.StartPluginAtPath(socketPath, plugin)
29+
stoppable, err := server.StartPluginAtPath(socketPath, plugin, more...)
3030
if err != nil {
3131
log.Error(err)
3232
}

pkg/manager/group_plugin_impl.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package manager
22

33
import (
4-
"encoding/json"
5-
64
log "github.com/Sirupsen/logrus"
75
"github.com/docker/infrakit/pkg/plugin"
86
rpc "github.com/docker/infrakit/pkg/rpc/group"
97
"github.com/docker/infrakit/pkg/spi/group"
8+
"github.com/docker/infrakit/pkg/types"
109
)
1110

1211
// proxyForGroupPlugin registers a group plugin that this manager will proxy for.
@@ -17,7 +16,7 @@ func (m *manager) proxyForGroupPlugin(name string) (group.Plugin, error) {
1716

1817
// A late-binding proxy so that we don't have a problem with having to
1918
// start up the manager as the last of all the plugins.
20-
return NewProxy(func() (group.Plugin, error) {
19+
return newProxy(func() (group.Plugin, error) {
2120
endpoint, err := m.plugins.Find(plugin.Name(name))
2221
if err != nil {
2322
return nil, err
@@ -33,7 +32,7 @@ func (m *manager) updateConfig(spec group.Spec) error {
3332

3433
// Always read and then update with the current value. Assumes the user's input
3534
// is always authoritative.
36-
stored := GlobalSpec{}
35+
stored := globalSpec{}
3736

3837
err := m.snapshot.Load(&stored)
3938
if err != nil {
@@ -43,17 +42,16 @@ func (m *manager) updateConfig(spec group.Spec) error {
4342
// if not-found ok to continue...
4443

4544
if stored.Groups == nil {
46-
stored.Groups = map[group.ID]PluginSpec{}
45+
stored.Groups = map[group.ID]plugin.Spec{}
4746
}
4847

49-
buff, err := json.MarshalIndent(spec, " ", " ")
48+
any, err := types.AnyValue(spec)
5049
if err != nil {
5150
return err
5251
}
53-
raw := json.RawMessage(buff)
54-
stored.Groups[spec.ID] = PluginSpec{
55-
Plugin: m.backendName,
56-
Properties: &raw,
52+
stored.Groups[spec.ID] = plugin.Spec{
53+
Plugin: plugin.Name(m.backendName),
54+
Properties: any,
5755
}
5856
log.Debugln("Saving updated config", stored)
5957

pkg/manager/group_proxy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import (
66
"github.com/docker/infrakit/pkg/spi/group"
77
)
88

9-
// NewProxy returns a plugin interface. The proxy is late-binding in that
9+
// newProxy returns a plugin interface. The proxy is late-binding in that
1010
// it does not resolve plugin until a method is called.
11-
func NewProxy(finder func() (group.Plugin, error)) group.Plugin {
11+
func newProxy(finder func() (group.Plugin, error)) group.Plugin {
1212
return &proxy{finder: finder}
1313
}
1414

pkg/manager/group_proxy_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
func TestErrorOnCallsToNilPlugin(t *testing.T) {
1212

1313
errMessage := "no-plugin"
14-
proxy := NewProxy(func() (group.Plugin, error) {
14+
proxy := newProxy(func() (group.Plugin, error) {
1515
return nil, errors.New(errMessage)
1616
})
1717

@@ -45,7 +45,7 @@ func TestDelayPluginLookupCallingMethod(t *testing.T) {
4545
},
4646
}
4747

48-
proxy := NewProxy(func() (group.Plugin, error) { return fake, nil })
48+
proxy := newProxy(func() (group.Plugin, error) { return fake, nil })
4949

5050
require.False(t, called)
5151

@@ -66,7 +66,7 @@ func TestDelayPluginLookupCallingMethodReturnsError(t *testing.T) {
6666
},
6767
}
6868

69-
proxy := NewProxy(func() (group.Plugin, error) { return fake, nil })
69+
proxy := newProxy(func() (group.Plugin, error) { return fake, nil })
7070

7171
require.False(t, called)
7272

@@ -93,7 +93,7 @@ func TestDelayPluginLookupCallingMultipleMethods(t *testing.T) {
9393
},
9494
}
9595

96-
proxy := NewProxy(func() (group.Plugin, error) { return fake, nil })
96+
proxy := newProxy(func() (group.Plugin, error) { return fake, nil })
9797

9898
require.False(t, called)
9999

0 commit comments

Comments
 (0)