Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 7a75396

Browse files
author
David Chung
authored
Tiered selector (#671)
Signed-off-by: David Chung <[email protected]>
1 parent f9e924e commit 7a75396

File tree

14 files changed

+589
-100
lines changed

14 files changed

+589
-100
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#
2+
# Tiered provisioning example
3+
#
4+
# Start up infrakit:
5+
#
6+
# export INFRAKIT_SELECTOR_TIERED_PLUGINS='spot/compute;ondemand/compute'
7+
#
8+
# In the env variable above we set the ordering of the plugins...
9+
# First, we try spot/compute then ondemand/compute. We will use the 'simulator'
10+
# kind to simulate the different types of instances by starting the plugin at
11+
# specific socket addresses: one as ondemand and the other as spot.
12+
# In the infrakit CLI you will see ondemand/compute and spot/compute as available
13+
# commands. These will allow you to directly query the instance plugins.
14+
#
15+
# infrakit plugin start manager group vanilla simulator:ondemand simulator:spot selector/tiered --log 5 --log-stack
16+
#
17+
# * Note on the use of env variable... when we support multiple plugins in a single
18+
# spec and automatic activation, you will create sections of spec/metadata/Options
19+
# to actually set the options for the tiered selector.
20+
#
21+
# Commit this file:
22+
#
23+
# infrakit group controller commit -y docs/instance/selector/tiered/group.yml
24+
#
25+
# After a while, you can verify that 3 are created in spot/compute and 2 in ondemand/compute
26+
# infrakit spot/compute describe
27+
# infrakit ondemand/compute describe
28+
#
29+
#
30+
#
31+
kind: group
32+
metadata:
33+
name: workers
34+
properties:
35+
Allocation:
36+
Size: 5
37+
Flavor:
38+
Plugin: vanilla
39+
Properties:
40+
Init:
41+
- apt-get install curl
42+
- curl -sSL https://get.docker.com | sh
43+
Tags:
44+
project: infrakit
45+
tier: app
46+
Instance:
47+
48+
# here we reference the tiered selector at the socket location, selector.
49+
Plugin: selector/tiered
50+
Properties:
51+
52+
spot/compute:
53+
# This is a special property that the simulator understands to limit
54+
# the number of instances it can provision. So for cluster size > 3
55+
# we expect to see the ondemand/compute instances getting added.
56+
Cap: 3
57+
instanceType: small
58+
bid: 0.02
59+
labels:
60+
project: infrakit
61+
billing: spot
62+
63+
# Properties for the ondemand/compute instances. The section after the
64+
# name of the plugin (ondemand/compute) is the properties blob to be fed
65+
# to the plugin.
66+
ondemand/compute:
67+
instanceType: small
68+
labels:
69+
project: infrakit
70+
billing: ondemand

pkg/discovery/local/dir.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (r *dirPluginDiscovery) List() (map[string]*plugin.Endpoint, error) {
136136
continue
137137
}
138138

139-
log.Debug("Discovered plugin", "address", instance.Address, "V", logutil.V(500))
139+
log.Debug("Discovered plugin", "address", instance.Address, "V", logutil.V(1000))
140140
plugins[instance.Name] = instance
141141
}
142142
}

pkg/plugin/instance/selector/internal/base.go

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"sort"
66
"strings"
7+
"sync"
78

89
"github.com/docker/infrakit/pkg/discovery"
910
logutil "github.com/docker/infrakit/pkg/log"
@@ -14,25 +15,55 @@ import (
1415
"github.com/docker/infrakit/pkg/types"
1516
)
1617

17-
var log = logutil.New("module", "plugin/instance/selector")
18+
var log = logutil.New("module", "plugin/instance/selector/base")
1819

1920
// Base is the base implementation of an instance plugin
2021
type Base struct {
2122
Plugins func() discovery.Plugins
2223
Choices []selector.Choice
2324
SelectFunc func(instance.Spec, []selector.Choice, func(selector.Choice) instance.Plugin) (selector.Choice, error)
24-
PluginClientFunc func(map[string]*plugin.Endpoint, plugin.Name) instance.Plugin
25+
PluginClientFunc func(plugin.Name) (instance.Plugin, error)
2526
}
2627

27-
func instancePlugin(plugins map[string]*plugin.Endpoint, name plugin.Name) instance.Plugin {
28+
var (
29+
clients = map[string]instance.Plugin{}
30+
lock sync.RWMutex
31+
)
32+
33+
func (b *Base) instancePlugin(name plugin.Name) (instance.Plugin, error) {
34+
2835
lookup, _ := name.GetLookupAndType()
36+
37+
lock.RLock()
38+
if p, has := clients[lookup]; has {
39+
lock.RUnlock()
40+
return p, nil
41+
}
42+
lock.RUnlock()
43+
44+
lock.Lock()
45+
defer lock.Unlock()
46+
47+
plugins, err := b.Plugins().List()
48+
if err != nil {
49+
return nil, err
50+
}
51+
2952
if endpoint, has := plugins[lookup]; has {
3053
if p, err := instance_rpc.NewClient(name, endpoint.Address); err == nil {
31-
return p
54+
return p, nil
3255
}
3356
log.Warn("not an instance plugin", "name", name, "endpoint", endpoint)
3457
}
35-
return nil
58+
return nil, nil
59+
}
60+
61+
// Init initializes the base by setting any unset properties with defaults
62+
func (b *Base) Init() *Base {
63+
if b.PluginClientFunc == nil {
64+
b.PluginClientFunc = b.instancePlugin
65+
}
66+
return b
3667
}
3768

3869
func (b *Base) selectOne(spec instance.Spec) (match selector.Choice, p instance.Plugin, err error) {
@@ -60,21 +91,44 @@ func (b *Base) selectOne(spec instance.Spec) (match selector.Choice, p instance.
6091
return
6192
}
6293

63-
func (b *Base) visit(f func(selector.Choice, instance.Plugin) error) error {
64-
if b.PluginClientFunc == nil {
65-
b.PluginClientFunc = instancePlugin
66-
}
94+
// VisitChoices visits all the choices linearly one by one. If the work function returns
95+
// false or error, the visit stops.
96+
func (b *Base) VisitChoices(visit func(selector.Choice, instance.Plugin) (bool, error)) error {
6797

68-
plugins, err := b.Plugins().List()
69-
if err != nil {
70-
return err
98+
for _, choice := range b.Choices {
99+
instancePlugin, err := b.PluginClientFunc(choice.Name)
100+
if err != nil {
101+
return err
102+
}
103+
104+
if instancePlugin == nil {
105+
// TODO -- implement retry??
106+
log.Warn("cannot contact plugin", "name", choice.Name)
107+
continue
108+
109+
}
110+
if continueRun, err := visit(choice, instancePlugin); err != nil {
111+
return err
112+
} else if !continueRun {
113+
return nil
114+
}
71115
}
116+
return nil
117+
}
72118

119+
func (b *Base) visit(f func(selector.Choice, instance.Plugin) error) error {
73120
for _, choice := range b.Choices {
74-
instancePlugin := b.PluginClientFunc(plugins, choice.Name)
121+
log.Debug("checking choice", "choice", choice)
122+
123+
instancePlugin, err := b.PluginClientFunc(choice.Name)
124+
if err != nil {
125+
return err
126+
}
127+
75128
if instancePlugin == nil {
76129
continue
77130
}
131+
log.Debug("found instance plugin", "name", choice.Name, "client", instancePlugin)
78132
if err := f(choice, instancePlugin); err != nil {
79133
return err
80134
}
@@ -83,7 +137,6 @@ func (b *Base) visit(f func(selector.Choice, instance.Plugin) error) error {
83137
}
84138

85139
func (b *Base) doAll(count int, work func(instance.Plugin) error) error {
86-
87140
errs := make(chan error, len(b.Choices))
88141
success := make(chan interface{}, len(b.Choices))
89142
err := b.visit(func(c selector.Choice, p instance.Plugin) error {
@@ -165,17 +218,23 @@ func (b *Base) Provision(spec instance.Spec) (*instance.ID, error) {
165218

166219
// DescribeInstances returns descriptions of all instances matching all of the provided tags.
167220
func (b *Base) DescribeInstances(tags map[string]string, properties bool) ([]instance.Description, error) {
221+
222+
log.Debug("DescribeInstances", "tags", tags, "properties", properties)
223+
168224
// Loop through all the choices and aggregate all the instances
169225

170226
keys := []string{}
171227
uniques := map[string]instance.Description{}
172228

173229
err := b.visit(func(c selector.Choice, p instance.Plugin) error {
174230
instances, err := p.DescribeInstances(tags, properties)
231+
232+
log.Debug("describing instances", "choice", c, "instances", instances, "err", err)
175233
if err != nil {
176234
// It's important to fail at this point if we can't get an accurate list of instances
177235
// across the zones. This way, other controllers won't be fooled into thinking that
178236
// they need reconcile state by provisioning more instances.
237+
log.Error("describing instances", "choice", c, "err", err)
179238
return err
180239
}
181240
for _, instance := range instances {
@@ -193,6 +252,7 @@ func (b *Base) DescribeInstances(tags map[string]string, properties bool) ([]ins
193252
for _, k := range keys {
194253
result = append(result, uniques[k])
195254
}
255+
196256
return result, nil
197257
}
198258

pkg/plugin/instance/selector/internal/base_test.go

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ import (
1313
"github.com/stretchr/testify/require"
1414
)
1515

16+
func mustPlugin(p instance.Plugin, err error) instance.Plugin {
17+
if err != nil {
18+
panic(err)
19+
}
20+
return p
21+
}
22+
1623
type testDiscovery map[string]*plugin.Endpoint
1724

1825
func (td testDiscovery) List() (map[string]*plugin.Endpoint, error) {
@@ -59,21 +66,21 @@ func TestVisit(t *testing.T) {
5966
}
6067
},
6168
Choices: options,
62-
PluginClientFunc: func(m map[string]*plugin.Endpoint, n plugin.Name) instance.Plugin {
69+
PluginClientFunc: func(n plugin.Name) (instance.Plugin, error) {
6370
switch n {
6471
case n1:
65-
return p1
72+
return p1, nil
6673
case n2:
67-
return p2
74+
return p2, nil
6875
}
69-
return nil
76+
return nil, nil
7077
},
7178
}
7279

73-
m, err := b.Plugins().List()
80+
_, err := b.Plugins().List()
7481
require.NoError(t, err)
75-
require.Equal(t, p1, b.PluginClientFunc(m, options[0].Name))
76-
require.Equal(t, p2, b.PluginClientFunc(m, options[1].Name))
82+
require.Equal(t, p1, mustPlugin(b.PluginClientFunc(options[0].Name)))
83+
require.Equal(t, p2, mustPlugin(b.PluginClientFunc(options[1].Name)))
7784

7885
// Check error handling
7986
require.Error(t, b.visit(
@@ -128,14 +135,14 @@ func TestDoAll(t *testing.T) {
128135
}
129136
},
130137
Choices: options,
131-
PluginClientFunc: func(m map[string]*plugin.Endpoint, n plugin.Name) instance.Plugin {
138+
PluginClientFunc: func(n plugin.Name) (instance.Plugin, error) {
132139
switch n {
133140
case n1:
134-
return p1
141+
return p1, nil
135142
case n2:
136-
return p2
143+
return p2, nil
137144
}
138-
return nil
145+
return nil, nil
139146
},
140147
}
141148

@@ -186,14 +193,14 @@ func TestDescribeInstances(t *testing.T) {
186193
}
187194
},
188195
Choices: options,
189-
PluginClientFunc: func(m map[string]*plugin.Endpoint, n plugin.Name) instance.Plugin {
196+
PluginClientFunc: func(n plugin.Name) (instance.Plugin, error) {
190197
switch n {
191198
case n1:
192-
return p1
199+
return p1, nil
193200
case n2:
194-
return p2
201+
return p2, nil
195202
}
196-
return nil
203+
return nil, nil
197204
},
198205
}
199206

@@ -255,14 +262,14 @@ func TestValidate(t *testing.T) {
255262
}
256263
},
257264
Choices: options,
258-
PluginClientFunc: func(m map[string]*plugin.Endpoint, n plugin.Name) instance.Plugin {
265+
PluginClientFunc: func(n plugin.Name) (instance.Plugin, error) {
259266
switch n {
260267
case n1:
261-
return p1
268+
return p1, nil
262269
case n2:
263-
return p2
270+
return p2, nil
264271
}
265-
return nil
272+
return nil, nil
266273
},
267274
}
268275

@@ -315,14 +322,14 @@ func TestLabel(t *testing.T) {
315322
}
316323
},
317324
Choices: options,
318-
PluginClientFunc: func(m map[string]*plugin.Endpoint, n plugin.Name) instance.Plugin {
325+
PluginClientFunc: func(n plugin.Name) (instance.Plugin, error) {
319326
switch n {
320327
case n1:
321-
return p1
328+
return p1, nil
322329
case n2:
323-
return p2
330+
return p2, nil
324331
}
325-
return nil
332+
return nil, nil
326333
},
327334
}
328335

@@ -381,14 +388,14 @@ func TestDestroy(t *testing.T) {
381388
}
382389
},
383390
Choices: options,
384-
PluginClientFunc: func(m map[string]*plugin.Endpoint, n plugin.Name) instance.Plugin {
391+
PluginClientFunc: func(n plugin.Name) (instance.Plugin, error) {
385392
switch n {
386393
case n1:
387-
return p1
394+
return p1, nil
388395
case n2:
389-
return p2
396+
return p2, nil
390397
}
391-
return nil
398+
return nil, nil
392399
},
393400
}
394401

@@ -444,14 +451,14 @@ func TestProvision(t *testing.T) {
444451
}
445452
},
446453
Choices: options,
447-
PluginClientFunc: func(m map[string]*plugin.Endpoint, n plugin.Name) instance.Plugin {
454+
PluginClientFunc: func(n plugin.Name) (instance.Plugin, error) {
448455
switch n {
449456
case n1:
450-
return p1
457+
return p1, nil
451458
case n2:
452-
return p2
459+
return p2, nil
453460
}
454-
return nil
461+
return nil, nil
455462
},
456463
}
457464

0 commit comments

Comments
 (0)