Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 8269243

Browse files
author
David Chung
authored
Manager now uses a late binding proxy for group plugin (#354)
Signed-off-by: David Chung <[email protected]>
1 parent a5967a1 commit 8269243

File tree

7 files changed

+234
-29
lines changed

7 files changed

+234
-29
lines changed

pkg/manager/group_plugin_impl.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,18 @@ import (
1212
func (m *manager) proxyForGroupPlugin(name string) (group.Plugin, error) {
1313
m.lock.Lock()
1414
defer m.lock.Unlock()
15-
16-
endpoint, err := m.plugins.Find(name)
17-
if err != nil {
18-
return nil, err
19-
}
20-
2115
m.backendName = name
22-
return rpc.NewClient(endpoint.Address), nil
23-
}
2416

25-
// This implements the Group Plugin interface to support single group-only operations
26-
// This is contrast with the declarative semantics of commit. It offers an imperative
27-
// operation by operation interface to the user.
17+
// A late-binding proxy so that we don't have a problem with having to
18+
// start up the manager as the last of all the plugins.
19+
return NewProxy(func() (group.Plugin, error) {
20+
endpoint, err := m.plugins.Find(name)
21+
if err != nil {
22+
return nil, err
23+
}
24+
return rpc.NewClient(endpoint.Address), nil
25+
}), nil
26+
}
2827

2928
func (m *manager) updateConfig(spec group.Spec) error {
3029
log.Debugln("Updating config", spec)
@@ -36,6 +35,8 @@ func (m *manager) updateConfig(spec group.Spec) error {
3635
stored := GlobalSpec{}
3736

3837
err := m.snapshot.Load(&stored)
38+
39+
log.Warningln("Error updating config:", spec, "with error=", err)
3940
// TODO: More robust (type-based) error handling.
4041
if err != nil && err.Error() != "not-found" {
4142
return err
@@ -60,21 +61,35 @@ func (m *manager) updateConfig(spec group.Spec) error {
6061
return m.snapshot.Save(stored)
6162
}
6263

63-
func (m *manager) CommitGroup(grp group.Spec, pretend bool) (string, error) {
64-
err := make(chan error)
64+
// This implements/ overrides the Group Plugin interface to support single group-only operations
65+
func (m *manager) CommitGroup(grp group.Spec, pretend bool) (resp string, err error) {
66+
67+
resultChan := make(chan []interface{})
68+
6569
m.backendOps <- backendOp{
66-
name: "watch",
70+
name: "commit",
6771
operation: func() error {
68-
log.Debugln("Proxy WatchGroup:", grp)
72+
log.Infoln("Proxy CommitGroup:", grp)
6973
if !pretend {
7074
if err := m.updateConfig(grp); err != nil {
75+
log.Warningln("Error updating", err)
7176
return err
7277
}
7378
}
74-
_, err := m.Plugin.CommitGroup(grp, pretend)
79+
resp, cerr := m.Plugin.CommitGroup(grp, pretend)
80+
log.Infoln("Responses from CommitGroup:", resp, cerr)
81+
resultChan <- []interface{}{resp, cerr}
7582
return err
7683
},
77-
err: err,
7884
}
79-
return "TODO(chungers): Allow the commit details string to be plumbed through", <-err
85+
86+
r := <-resultChan
87+
88+
if v, has := r[0].(string); has {
89+
resp = v
90+
}
91+
if v, has := r[1].(error); has && v != nil {
92+
err = v
93+
}
94+
return
8095
}

pkg/manager/group_proxy.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package manager
2+
3+
import (
4+
"sync"
5+
6+
"github.com/docker/infrakit/pkg/spi/group"
7+
)
8+
9+
// NewProxy returns a plugin interface. The proxy is late-binding in that
10+
// it does not resolve plugin until a method is called.
11+
func NewProxy(finder func() (group.Plugin, error)) group.Plugin {
12+
return &proxy{finder: finder}
13+
}
14+
15+
type proxy struct {
16+
lock sync.Mutex
17+
client group.Plugin
18+
finder func() (group.Plugin, error)
19+
}
20+
21+
func (c *proxy) run(f func(group.Plugin) error) error {
22+
c.lock.Lock()
23+
defer c.lock.Unlock()
24+
25+
if c.client == nil {
26+
if p, err := c.finder(); err == nil {
27+
c.client = p
28+
} else {
29+
return err
30+
}
31+
}
32+
33+
return f(c.client)
34+
}
35+
36+
func (c *proxy) CommitGroup(grp group.Spec, pretend bool) (resp string, err error) {
37+
err = c.run(func(g group.Plugin) error {
38+
resp, err = g.CommitGroup(grp, pretend)
39+
return err
40+
})
41+
return
42+
}
43+
44+
func (c *proxy) FreeGroup(id group.ID) (err error) {
45+
err = c.run(func(g group.Plugin) error {
46+
err = g.FreeGroup(id)
47+
return err
48+
})
49+
return
50+
}
51+
52+
func (c *proxy) DescribeGroup(id group.ID) (desc group.Description, err error) {
53+
err = c.run(func(g group.Plugin) error {
54+
desc, err = g.DescribeGroup(id)
55+
return err
56+
})
57+
return
58+
}
59+
60+
func (c *proxy) DestroyGroup(id group.ID) (err error) {
61+
err = c.run(func(g group.Plugin) error {
62+
err = g.DestroyGroup(id)
63+
return err
64+
})
65+
return
66+
}
67+
68+
func (c *proxy) InspectGroups() (specs []group.Spec, err error) {
69+
err = c.run(func(g group.Plugin) error {
70+
specs, err = g.InspectGroups()
71+
return err
72+
})
73+
return
74+
}

pkg/manager/group_proxy_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package manager
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
"github.com/docker/infrakit/pkg/spi/group"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestErrorOnCallsToNilPlugin(t *testing.T) {
12+
13+
errMessage := "no-plugin"
14+
proxy := NewProxy(func() (group.Plugin, error) {
15+
return nil, errors.New(errMessage)
16+
})
17+
18+
err := proxy.FreeGroup(group.ID("test"))
19+
require.Error(t, err)
20+
require.Equal(t, errMessage, err.Error())
21+
}
22+
23+
type fakeGroupPlugin struct {
24+
group.Plugin
25+
commitGroup func(grp group.Spec, pretend bool) (string, error)
26+
freeGroup func(id group.ID) error
27+
}
28+
29+
func (f *fakeGroupPlugin) CommitGroup(grp group.Spec, pretend bool) (string, error) {
30+
return f.commitGroup(grp, pretend)
31+
}
32+
func (f *fakeGroupPlugin) FreeGroup(id group.ID) error {
33+
return f.freeGroup(id)
34+
}
35+
36+
func TestDelayPluginLookupCallingMethod(t *testing.T) {
37+
38+
called := false
39+
fake := &fakeGroupPlugin{
40+
commitGroup: func(grp group.Spec, pretend bool) (string, error) {
41+
called = true
42+
require.Equal(t, group.Spec{ID: "foo"}, grp)
43+
require.Equal(t, true, pretend)
44+
return "some-response", nil
45+
},
46+
}
47+
48+
proxy := NewProxy(func() (group.Plugin, error) { return fake, nil })
49+
50+
require.False(t, called)
51+
52+
actualStr, actualErr := proxy.CommitGroup(group.Spec{ID: "foo"}, true)
53+
require.True(t, called)
54+
require.NoError(t, actualErr)
55+
require.Equal(t, "some-response", actualStr)
56+
}
57+
58+
func TestDelayPluginLookupCallingMethodReturnsError(t *testing.T) {
59+
60+
called := false
61+
fake := &fakeGroupPlugin{
62+
freeGroup: func(id group.ID) error {
63+
called = true
64+
require.Equal(t, group.ID("foo"), id)
65+
return errors.New("can't-free")
66+
},
67+
}
68+
69+
proxy := NewProxy(func() (group.Plugin, error) { return fake, nil })
70+
71+
require.False(t, called)
72+
73+
actualErr := proxy.FreeGroup(group.ID("foo"))
74+
require.True(t, called)
75+
require.Error(t, actualErr)
76+
require.Equal(t, "can't-free", actualErr.Error())
77+
}
78+
79+
func TestDelayPluginLookupCallingMultipleMethods(t *testing.T) {
80+
81+
called := false
82+
fake := &fakeGroupPlugin{
83+
commitGroup: func(grp group.Spec, pretend bool) (string, error) {
84+
called = true
85+
require.Equal(t, group.Spec{ID: "foo"}, grp)
86+
require.Equal(t, true, pretend)
87+
return "some-response", nil
88+
},
89+
freeGroup: func(id group.ID) error {
90+
called = true
91+
require.Equal(t, group.ID("foo"), id)
92+
return errors.New("can't-free")
93+
},
94+
}
95+
96+
proxy := NewProxy(func() (group.Plugin, error) { return fake, nil })
97+
98+
require.False(t, called)
99+
100+
actualStr, actualErr := proxy.CommitGroup(group.Spec{ID: "foo"}, true)
101+
require.True(t, called)
102+
require.NoError(t, actualErr)
103+
require.Equal(t, "some-response", actualStr)
104+
105+
called = false
106+
actualErr = proxy.FreeGroup(group.ID("foo"))
107+
require.True(t, called)
108+
require.Error(t, actualErr)
109+
require.Equal(t, "can't-free", actualErr.Error())
110+
}

pkg/manager/manager.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"sync"
77

8-
"errors"
98
log "github.com/Sirupsen/logrus"
109
"github.com/docker/infrakit/pkg/discovery"
1110
"github.com/docker/infrakit/pkg/leader"
@@ -43,7 +42,6 @@ type manager struct {
4342
type backendOp struct {
4443
name string
4544
operation func() error
46-
err chan<- error
4745
}
4846

4947
// NewManager returns the manager which depends on other services to coordinate and manage
@@ -114,9 +112,7 @@ func (m *manager) Start() (<-chan struct{}, error) {
114112
case op := <-backendOps:
115113
log.Debugln("Backend operation:", op)
116114
if m.isLeader {
117-
op.err <- op.operation()
118-
} else {
119-
op.err <- errors.New("not-a-leader")
115+
op.operation()
120116
}
121117

122118
case <-stopWorkQueue:

pkg/store/file/file.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,12 @@ func (s *snapshot) Save(obj interface{}) error {
4343
// Load loads a snapshot and marshals into the given reference
4444
func (s *snapshot) Load(output interface{}) error {
4545
buff, err := ioutil.ReadFile(filepath.Join(s.dir, s.name))
46-
if err != nil {
46+
if err == nil {
47+
return json.Unmarshal(buff, output)
48+
}
49+
if os.IsExist(err) {
50+
// if file exists and we have problem reading
4751
return err
4852
}
49-
return json.Unmarshal(buff, output)
53+
return nil
5054
}

pkg/store/store.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ type Snapshot interface {
77
// Save marshals (encodes) and saves a snapshot of the given object.
88
Save(obj interface{}) error
99

10-
// Load loads a snapshot and marshals (decodes) into the given reference
10+
// Load loads a snapshot and marshals (decodes) into the given reference.
11+
// If no data is available to unmarshal into the given struct, the fuction returns nil.
1112
Load(output interface{}) error
1213
}

pkg/store/swarm/swarm.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,17 @@ func (s *snapshot) Save(obj interface{}) error {
4242
// Load loads a snapshot and marshals into the given reference
4343
func (s *snapshot) Load(output interface{}) error {
4444
label, err := readSwarm(s.client)
45-
if err != nil {
45+
if err == nil {
46+
return decode(label, output)
47+
}
48+
if err != errNotFound {
4649
return err
4750
}
48-
return decode(label, output)
51+
return nil
4952
}
5053

54+
var errNotFound = fmt.Errorf("not-found")
55+
5156
func readSwarm(client client.APIClient) (string, error) {
5257
info, err := client.SwarmInspect(context.Background())
5358
if err != nil {
@@ -60,7 +65,7 @@ func readSwarm(client client.APIClient) (string, error) {
6065
return l, nil
6166
}
6267
}
63-
return "", fmt.Errorf("not-found")
68+
return "", errNotFound
6469
}
6570

6671
func writeSwarm(client client.APIClient, value string) error {

0 commit comments

Comments
 (0)