Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 5a6c860

Browse files
author
David Chung
authored
Multiplex instance plugins (#360)
Signed-off-by: David Chung <[email protected]>
1 parent 275105d commit 5a6c860

File tree

15 files changed

+498
-41
lines changed

15 files changed

+498
-41
lines changed

cmd/cli/instance.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
log "github.com/Sirupsen/logrus"
1212
"github.com/docker/infrakit/pkg/discovery"
13+
"github.com/docker/infrakit/pkg/plugin"
1314
instance_plugin "github.com/docker/infrakit/pkg/rpc/instance"
1415
"github.com/docker/infrakit/pkg/spi/instance"
1516
"github.com/spf13/cobra"
@@ -31,7 +32,7 @@ func instancePluginCommand(plugins func() discovery.Plugins) *cobra.Command {
3132
return err
3233
}
3334

34-
instancePlugin = instance_plugin.NewClient(endpoint.Address)
35+
instancePlugin = instance_plugin.NewClient(plugin.Name(*name), endpoint.Address)
3536

3637
return nil
3738
}

cmd/cli/plugin.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/docker/infrakit/pkg/discovery"
88
"github.com/docker/infrakit/pkg/launch"
99
"github.com/docker/infrakit/pkg/launch/os"
10+
"github.com/docker/infrakit/pkg/plugin"
1011
"github.com/docker/infrakit/pkg/template"
1112
"github.com/spf13/cobra"
1213
)
@@ -96,16 +97,16 @@ func pluginCommand(plugins func() discovery.Plugins) *cobra.Command {
9697
}
9798

9899
// now start all the plugins
99-
for _, plugin := range args {
100-
fmt.Println("Starting up", plugin)
100+
for _, pluginToStart := range args {
101+
fmt.Println("Starting up", pluginToStart)
101102

102103
wait.Add(1)
103104

104105
for _, ch := range input {
105106

106-
name := plugin
107+
name := pluginToStart
107108
ch <- launch.StartPlugin{
108-
Plugin: name,
109+
Plugin: plugin.Name(name),
109110
Started: func(config *launch.Config) {
110111
fmt.Println(name, "started.")
111112
wait.Done()

cmd/group/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
log "github.com/Sirupsen/logrus"
88
"github.com/docker/infrakit/pkg/cli"
99
"github.com/docker/infrakit/pkg/discovery"
10+
"github.com/docker/infrakit/pkg/plugin"
1011
"github.com/docker/infrakit/pkg/plugin/group"
1112
flavor_client "github.com/docker/infrakit/pkg/rpc/flavor"
1213
group_server "github.com/docker/infrakit/pkg/rpc/group"
@@ -40,7 +41,7 @@ func main() {
4041
if err != nil {
4142
return nil, err
4243
}
43-
return instance_client.NewClient(endpoint.Address), nil
44+
return instance_client.NewClient(plugin.Name(n), endpoint.Address), nil
4445
}
4546

4647
flavorPluginLookup := func(n string) (flavor.Plugin, error) {

pkg/discovery/dir.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ type dirPluginDiscovery struct {
1818

1919
// Find returns a plugin by name
2020
func (r *dirPluginDiscovery) Find(name string) (*plugin.Endpoint, error) {
21-
21+
lookup, _ := plugin.Name(name).GetLookupAndType()
2222
plugins, err := r.List()
2323
if err != nil {
2424
return nil, err
2525
}
2626

27-
p, exists := plugins[name]
27+
p, exists := plugins[lookup]
2828
if !exists {
29-
return nil, fmt.Errorf("Plugin not found: %s", name)
29+
return nil, fmt.Errorf("Plugin not found: %s (looked up using %s)", name, lookup)
3030
}
3131

3232
return p, nil

pkg/discovery/discovery.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
// Plugins provides access to plugin discovery.
1313
type Plugins interface {
14+
// Find looks up the plugin by name. The name can be of the form $lookup[/$subtype]. See GetLookupAndType().
1415
Find(name string) (*plugin.Endpoint, error)
1516
List() (map[string]*plugin.Endpoint, error)
1617
}

pkg/launch/monitor.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync"
66

77
log "github.com/Sirupsen/logrus"
8+
"github.com/docker/infrakit/pkg/plugin"
89
)
910

1011
var errNoConfig = errors.New("no-counfig")
@@ -21,7 +22,7 @@ type ExecRule struct {
2122
type Rule struct {
2223

2324
// Plugin is the name of the plugin
24-
Plugin string
25+
Plugin plugin.Name
2526

2627
// Launch is the rule for starting / launching the plugin.
2728
Launch ExecRule
@@ -31,7 +32,7 @@ type Rule struct {
3132
// Monitor uses a launcher to actually start the process of the plugin.
3233
type Monitor struct {
3334
exec Exec
34-
rules map[string]Rule
35+
rules map[plugin.Name]Rule
3536
startChan <-chan StartPlugin
3637
inputChan chan<- StartPlugin
3738
stop chan interface{}
@@ -41,7 +42,7 @@ type Monitor struct {
4142
// NewMonitor returns a monitor that continuously watches for input
4243
// requests and launches the process for the plugin, if not already running.
4344
func NewMonitor(l Exec, rules []Rule) *Monitor {
44-
m := map[string]Rule{}
45+
m := map[plugin.Name]Rule{}
4546
// index by name of plugin
4647
for _, r := range rules {
4748
if r.Launch.Exec == l.Name() {
@@ -56,7 +57,7 @@ func NewMonitor(l Exec, rules []Rule) *Monitor {
5657

5758
// StartPlugin is the command to start a plugin
5859
type StartPlugin struct {
59-
Plugin string
60+
Plugin plugin.Name
6061
Started func(*Config)
6162
Error func(*Config, error)
6263
}
@@ -97,7 +98,13 @@ func (m *Monitor) Start() (chan<- StartPlugin, error) {
9798
return
9899
}
99100

101+
// match first by full name of the form lookup/type -- 'specialization'
100102
r, has := m.rules[req.Plugin]
103+
if !has {
104+
// match now by lookup only -- 'base class'
105+
alternate, _ := req.Plugin.GetLookupAndType()
106+
r, has = m.rules[plugin.Name(alternate)]
107+
}
101108
if !has {
102109
log.Warningln("no plugin:", req)
103110
req.reportError(r.Launch.Properties, errNoConfig)
@@ -109,7 +116,7 @@ func (m *Monitor) Start() (chan<- StartPlugin, error) {
109116
*configCopy = *r.Launch.Properties
110117
}
111118

112-
block, err := m.exec.Exec(r.Plugin, configCopy)
119+
block, err := m.exec.Exec(r.Plugin.String(), configCopy)
113120
if err != nil {
114121
req.reportError(configCopy, err)
115122
continue loop

pkg/launch/monitor_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,53 @@ func TestMonitorLoopValidRule(t *testing.T) {
108108

109109
monitor.Stop()
110110
}
111+
112+
func TestMonitorLoopRuleLookupBehavior(t *testing.T) {
113+
raw := &Config{}
114+
config := &testConfig{
115+
Cmd: "hello",
116+
Args: []string{"world", "hello"},
117+
}
118+
119+
rawErr := raw.Marshal(config)
120+
require.NoError(t, rawErr)
121+
require.True(t, len([]byte(*raw)) > 0)
122+
123+
var receivedArgs *Config
124+
rule := Rule{
125+
Plugin: "hello",
126+
Launch: ExecRule{
127+
Exec: "test",
128+
Properties: raw,
129+
},
130+
}
131+
monitor := NewMonitor(&testLauncher{
132+
name: "test",
133+
t: t,
134+
callback: func(c *Config) {
135+
receivedArgs = c
136+
},
137+
}, []Rule{rule})
138+
139+
input, err := monitor.Start()
140+
require.NoError(t, err)
141+
require.NotNil(t, input)
142+
143+
started := make(chan interface{})
144+
input <- StartPlugin{
145+
Plugin: "hello",
146+
Started: func(config *Config) {
147+
close(started)
148+
},
149+
}
150+
151+
<-started
152+
153+
expected := &Config{}
154+
err = expected.Marshal(config)
155+
require.NoError(t, err)
156+
157+
require.Equal(t, *expected, *receivedArgs)
158+
159+
monitor.Stop()
160+
}

pkg/plugin/name.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package plugin
2+
3+
import (
4+
"strings"
5+
)
6+
7+
// Name is a reference to the plugin. Places where it appears include JSON files as type of field `Plugin`.
8+
type Name string
9+
10+
// GetLookupAndType returns the plugin name for lookup and sub-type supported by the plugin.
11+
// The name follows a microformat of $plugin[/$subtype] where $plugin is used for the discovery / lookup by name.
12+
// The $subtype is used for the Type parameter in the RPC requests.
13+
// Example: instance-file/json means lookup socket file 'instance-file' and the type is 'json'.
14+
func (r Name) GetLookupAndType() (string, string) {
15+
name := string(r)
16+
if first := strings.Index(name, "/"); first >= 0 {
17+
return name[0:first], name[first+1:]
18+
}
19+
return name, ""
20+
}
21+
22+
// String returns the string representation
23+
func (r Name) String() string {
24+
return string(r)
25+
}

pkg/plugin/name_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package plugin
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestGetLookupAndType(t *testing.T) {
10+
11+
ref := Name("instance-file")
12+
lookup, instanceType := ref.GetLookupAndType()
13+
require.Equal(t, "instance-file", lookup)
14+
require.Equal(t, "", instanceType)
15+
16+
ref = Name("instance-file/json")
17+
lookup, instanceType = ref.GetLookupAndType()
18+
require.Equal(t, "instance-file", lookup)
19+
require.Equal(t, "json", instanceType)
20+
21+
ref = Name("instance-file/text/html")
22+
lookup, instanceType = ref.GetLookupAndType()
23+
require.Equal(t, "instance-file", lookup)
24+
require.Equal(t, "text/html", instanceType)
25+
}

pkg/rpc/instance/client.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,35 @@ package instance
22

33
import (
44
"encoding/json"
5+
6+
"github.com/docker/infrakit/pkg/plugin"
57
rpc_client "github.com/docker/infrakit/pkg/rpc/client"
68
"github.com/docker/infrakit/pkg/spi/instance"
79
)
810

911
// NewClient returns a plugin interface implementation connected to a plugin
10-
func NewClient(socketPath string) instance.Plugin {
11-
return &client{client: rpc_client.New(socketPath, instance.InterfaceSpec)}
12+
func NewClient(name plugin.Name, socketPath string) instance.Plugin {
13+
return &client{name: name, client: rpc_client.New(socketPath, instance.InterfaceSpec)}
1214
}
1315

1416
type client struct {
17+
name plugin.Name
1518
client rpc_client.Client
1619
}
1720

1821
// Validate performs local validation on a provision request.
1922
func (c client) Validate(properties json.RawMessage) error {
20-
req := ValidateRequest{Properties: &properties}
23+
_, instanceType := c.name.GetLookupAndType()
24+
req := ValidateRequest{Properties: &properties, Type: instanceType}
2125
resp := ValidateResponse{}
2226

2327
return c.client.Call("Instance.Validate", req, &resp)
2428
}
2529

2630
// Provision creates a new instance based on the spec.
2731
func (c client) Provision(spec instance.Spec) (*instance.ID, error) {
28-
req := ProvisionRequest{Spec: spec}
32+
_, instanceType := c.name.GetLookupAndType()
33+
req := ProvisionRequest{Spec: spec, Type: instanceType}
2934
resp := ProvisionResponse{}
3035

3136
if err := c.client.Call("Instance.Provision", req, &resp); err != nil {
@@ -37,15 +42,17 @@ func (c client) Provision(spec instance.Spec) (*instance.ID, error) {
3742

3843
// Destroy terminates an existing instance.
3944
func (c client) Destroy(instance instance.ID) error {
40-
req := DestroyRequest{Instance: instance}
45+
_, instanceType := c.name.GetLookupAndType()
46+
req := DestroyRequest{Instance: instance, Type: instanceType}
4147
resp := DestroyResponse{}
4248

4349
return c.client.Call("Instance.Destroy", req, &resp)
4450
}
4551

4652
// DescribeInstances returns descriptions of all instances matching all of the provided tags.
4753
func (c client) DescribeInstances(tags map[string]string) ([]instance.Description, error) {
48-
req := DescribeInstancesRequest{Tags: tags}
54+
_, instanceType := c.name.GetLookupAndType()
55+
req := DescribeInstancesRequest{Tags: tags, Type: instanceType}
4956
resp := DescribeInstancesResponse{}
5057

5158
err := c.client.Call("Instance.DescribeInstances", req, &resp)

0 commit comments

Comments
 (0)