Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit ba1ce22

Browse files
author
David Chung
authored
Auto start plugins via up + various bug fixes (#732)
Signed-off-by: David Chung <[email protected]>
1 parent 108a1ba commit ba1ce22

File tree

14 files changed

+308
-141
lines changed

14 files changed

+308
-141
lines changed

cmd/infrakit/up/up.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
3939

4040
waitDuration := up.Flags().String("wait", "1s", "Wait for plugins to be ready")
4141
configURL := up.Flags().String("config-url", "", "URL for the startup configs")
42+
stack := up.Flags().String("stack", "mystack", "Name of the stack")
43+
4244
up.Flags().AddFlagSet(services.ProcessTemplateFlags)
4345
metadatas := up.Flags().StringSlice("metadata", []string{}, "key=value to set metadata")
4446

@@ -60,7 +62,8 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
6062
wait := types.MustParseDuration(*waitDuration)
6163

6264
log.Info("Starting up base plugins")
63-
basePlugins := []string{"vars", "manager"}
65+
baseStack := fmt.Sprintf("manager:%v", *stack)
66+
basePlugins := []string{"vars", "group:group-stateless", baseStack}
6467
for _, base := range basePlugins {
6568
execName, kind, name, _ := local.StartPlugin(base).Parse()
6669
err := pluginManager.Launch(execName, kind, name, nil)

cmd/infrakit/util/init/init.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,17 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
174174
for _, start := range *starts {
175175
targets = append(targets, local.StartPlugin(start))
176176
}
177-
more, err := local.Plugins(gid, groupSpec)
178-
if err != nil {
179-
return targets, err
177+
178+
// make this either-or to allow manual overriding.
179+
if len(*starts) == 0 {
180+
181+
more, err := local.Plugins(gid, groupSpec)
182+
if err != nil {
183+
return targets, err
184+
}
185+
targets = append(targets, more...)
186+
180187
}
181-
targets = append(targets, more...)
182188
log.Info("plugins to start", "targets", targets)
183189
return
184190
}

pkg/manager/api.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,10 @@ type Options struct {
9595

9696
// MetadataRefreshInterval is the interval to check for updates to metadata
9797
MetadataRefreshInterval types.Duration
98+
99+
// LeaderCommitSpecsRetries is how many times to retry commit specs when becomes leader
100+
LeaderCommitSpecsRetries int
101+
102+
// LeaderCommitSpecsRetryInterval is how long to wait before next retry
103+
LeaderCommitSpecsRetryInterval types.Duration
98104
}

pkg/manager/manager.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ func (m *manager) getCurrentState() (globalSpec, error) {
361361
return global, nil
362362
}
363363

364-
func (m *manager) loadSpecs() error {
364+
func (m *manager) loadAndCommitSpecs() error {
365365
if m.Options.SpecStore == nil {
366366
return nil
367367
}
@@ -385,7 +385,7 @@ func (m *manager) loadMetadata() (err error) {
385385
return nil
386386
}
387387

388-
log.Debug("loading metadata and committing")
388+
log.Debug("loading metadata and committing", "V", debugV2)
389389

390390
var saved interface{}
391391
err = m.Options.MetadataStore.Load(&saved)
@@ -400,11 +400,11 @@ func (m *manager) loadMetadata() (err error) {
400400
}
401401

402402
if any == nil {
403-
log.Debug("no metadata stored")
403+
log.Debug("no metadata stored", "V", debugV2)
404404
return
405405
}
406406

407-
log.Debug("loaded metadata", "data", any.String())
407+
log.Debug("loaded metadata", "data", any.String(), "V", debugV2)
408408
_, proposed, cas, e := m.Updatable.Changes([]metadata.Change{
409409
{Path: types.Dot, Value: any},
410410
})
@@ -413,7 +413,7 @@ func (m *manager) loadMetadata() (err error) {
413413
return
414414
}
415415

416-
log.Debug("updating backend with stored metadata", "cas", cas, "proposed", proposed)
416+
log.Debug("updating backend with stored metadata", "cas", cas, "proposed", proposed, "V", debugV2)
417417
return m.Updatable.Commit(proposed, cas)
418418
}
419419

@@ -427,11 +427,39 @@ func (m *manager) onAssumeLeadership() (err error) {
427427
log.Error("error loading metadata", "err", err)
428428
}
429429

430-
err = m.loadSpecs()
431-
if err != nil {
432-
log.Error("error loading specs", "err", err)
430+
err = m.loadAndCommitSpecs()
431+
log.Debug("Loading and committing specs", "err", err)
432+
433+
if err != nil && m.Options.LeaderCommitSpecsRetries > 0 {
434+
435+
log.Info("Retry loading and committing specs",
436+
"retries", m.Options.LeaderCommitSpecsRetries,
437+
"interval", m.Options.LeaderCommitSpecsRetryInterval)
438+
439+
delay := 1 * time.Second
440+
if m.Options.LeaderCommitSpecsRetryInterval > 0 {
441+
delay = m.Options.LeaderCommitSpecsRetryInterval.Duration()
442+
}
443+
444+
for i := 1; i < m.Options.LeaderCommitSpecsRetries; i++ {
445+
446+
<-time.After(delay)
447+
448+
err = m.loadAndCommitSpecs()
449+
if err == nil {
450+
log.Info("Loaded and committed specs")
451+
return nil
452+
}
453+
454+
if err != nil {
455+
log.Error("error loading specs", "err", err, "attempt", i)
456+
}
457+
458+
}
459+
433460
}
434-
return
461+
462+
return err
435463
}
436464

437465
func (m *manager) onLostLeadership() error {

pkg/rpc/client/handshake_test.go

Lines changed: 0 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,11 @@ package client
22

33
import (
44
"fmt"
5-
"io/ioutil"
6-
"net/http"
7-
"path/filepath"
85
"testing"
96

10-
"github.com/docker/infrakit/pkg/rpc/server"
11-
"github.com/docker/infrakit/pkg/spi"
127
"github.com/stretchr/testify/require"
138
)
149

15-
var apiSpec = spi.InterfaceSpec{
16-
Name: "TestPlugin",
17-
Version: "0.1.0",
18-
}
19-
20-
func startPluginServer(t *testing.T) (server.Stoppable, string) {
21-
dir, err := ioutil.TempDir("", "infrakit_handshake_test")
22-
require.NoError(t, err)
23-
24-
name := "instance"
25-
socket := filepath.Join(dir, name)
26-
27-
testServer, err := server.StartPluginAtPath(socket, &TestPlugin{spec: apiSpec})
28-
require.NoError(t, err)
29-
return testServer, socket
30-
}
31-
3210
func TestErrVersionMismatch(t *testing.T) {
3311
var e error
3412

@@ -38,73 +16,3 @@ func TestErrVersionMismatch(t *testing.T) {
3816
e = fmt.Errorf("untyped")
3917
require.False(t, IsErrVersionMismatch(e))
4018
}
41-
42-
func TestHandshakeSuccess(t *testing.T) {
43-
testServer, socket := startPluginServer(t)
44-
defer testServer.Stop()
45-
46-
r, err := New(socket, apiSpec)
47-
require.NoError(t, err)
48-
client := rpcClient{client: r}
49-
require.NoError(t, client.DoSomething())
50-
}
51-
52-
func TestHandshakeFailVersion(t *testing.T) {
53-
testServer, socket := startPluginServer(t)
54-
defer testServer.Stop()
55-
56-
r, err := New(socket, spi.InterfaceSpec{Name: "TestPlugin", Version: "0.2.0"})
57-
require.Error(t, err)
58-
59-
client := rpcClient{client: r}
60-
err = client.DoSomething()
61-
require.Error(t, err)
62-
require.Equal(t, "Plugin supports TestPlugin interface version 0.1.0, client requires 0.2.0", err.Error())
63-
}
64-
65-
func TestHandshakeFailWrongAPI(t *testing.T) {
66-
testServer, socket := startPluginServer(t)
67-
defer testServer.Stop()
68-
69-
r, err := New(socket, spi.InterfaceSpec{Name: "OtherPlugin", Version: "0.1.0"})
70-
require.Error(t, err)
71-
72-
client := rpcClient{client: r}
73-
err = client.DoSomething()
74-
require.Error(t, err)
75-
require.Equal(t, "Plugin does not support interface OtherPlugin/0.1.0", err.Error())
76-
}
77-
78-
type rpcClient struct {
79-
client Client
80-
}
81-
82-
func (c rpcClient) DoSomething() error {
83-
req := EmptyMessage{}
84-
resp := EmptyMessage{}
85-
return c.client.Call("TestPlugin.DoSomething", req, &resp)
86-
}
87-
88-
// TestPlugin is an RPC service for this unit test.
89-
type TestPlugin struct {
90-
spec spi.InterfaceSpec
91-
}
92-
93-
// ImplementedInterface returns the interface implemented by this RPC service.
94-
func (p *TestPlugin) ImplementedInterface() spi.InterfaceSpec {
95-
return p.spec
96-
}
97-
98-
// Types returns the types
99-
func (p *TestPlugin) Types() []string {
100-
return []string{"."}
101-
}
102-
103-
// EmptyMessage is an empty test message.
104-
type EmptyMessage struct {
105-
}
106-
107-
// DoSomething is an empty test RPC.
108-
func (p *TestPlugin) DoSomething(_ *http.Request, req *EmptyMessage, resp *EmptyMessage) error {
109-
return nil
110-
}

pkg/rpc/group/publisher.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package group
2+
3+
import (
4+
"github.com/docker/infrakit/pkg/spi/event"
5+
"github.com/docker/infrakit/pkg/types"
6+
)
7+
8+
// List returns a list of *child nodes* given a path for a topic.
9+
// A topic of "." is the top level
10+
func (p *Group) List(topic types.Path) (child []string, err error) {
11+
m := map[string]interface{}{}
12+
13+
subs := p.keyed.Types()
14+
if len(subs) > 0 {
15+
for _, t := range subs {
16+
types.Put([]string{t, "commit"}, "", m)
17+
types.Put([]string{t, "describe"}, "", m)
18+
}
19+
} else {
20+
types.Put([]string{"commit"}, "", m)
21+
types.Put([]string{"describe"}, "", m)
22+
}
23+
24+
return types.List(topic, m), nil
25+
}
26+
27+
// PublishOn sets the channel to publish
28+
func (p *Group) PublishOn(chan<- *event.Event) {
29+
}

0 commit comments

Comments
 (0)