Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 5a8272f

Browse files
author
David Chung
authored
Optionally block and retry metadata reads (#719)
Signed-off-by: David Chung <[email protected]>
1 parent 74bdaf6 commit 5a8272f

File tree

5 files changed

+217
-86
lines changed

5 files changed

+217
-86
lines changed

pkg/cli/v0/event/ls.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,20 @@ func Ls(name string, services *cli.Services) *cobra.Command {
6969
}
7070
}
7171

72+
// list paths relative to this path
73+
rel := types.PathFromString(p)
74+
if gopath.Dir(name) != "." {
75+
// the name has / in it. so the relative path is longer
76+
// ex) timer/streams and streams/msec/
77+
rel = types.PathFromString(name).Shift(1).Join(rel)
78+
}
7279
if p == "." && !*all {
7380
// special case of showing the top level plugin namespaces
7481
if i > 0 && !*quick {
7582
fmt.Println()
7683
}
7784
for _, l := range nodes {
78-
fmt.Println(l.Shift(1).Rel(types.PathFromString(p)))
85+
fmt.Println(l.Rel(rel))
7986
}
8087
break
8188
}
@@ -84,7 +91,7 @@ func Ls(name string, services *cli.Services) *cobra.Command {
8491
fmt.Printf("total %d:\n", len(nodes))
8592
}
8693
for _, l := range nodes {
87-
fmt.Println(l.Shift(1).Rel(types.PathFromString(p)))
94+
fmt.Println(l.Rel(rel))
8895
}
8996

9097
}

pkg/cli/v0/metadata/cat.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strconv"
77

88
"github.com/docker/infrakit/pkg/cli"
9+
metadata "github.com/docker/infrakit/pkg/plugin/metadata/template"
910
"github.com/docker/infrakit/pkg/types"
1011
"github.com/spf13/cobra"
1112
)
@@ -18,41 +19,41 @@ func Cat(name string, services *cli.Services) *cobra.Command {
1819
Short: "Get metadata entry by path",
1920
}
2021

22+
retry := cat.Flags().Duration("retry", 0, "Retry interval (e.g. 1s)")
23+
timeout := cat.Flags().Duration("timeout", 0, "Timeout")
24+
errOnTimeout := cat.Flags().Bool("err-on-timeout", false, "Return error on timeout")
25+
2126
cat.RunE = func(cmd *cobra.Command, args []string) error {
2227

23-
metadataPlugin, err := loadPlugin(services.Plugins(), name)
24-
if err != nil {
25-
return nil
26-
}
27-
cli.MustNotNil(metadataPlugin, "metadata plugin not found", "name", name)
28+
metadataFunc := metadata.MetadataFunc(services.Plugins)
2829

2930
for _, p := range args {
3031

31-
path := types.PathFromString(gopath.Join(name, p))
32-
first := path.Index(0)
33-
if first != nil {
32+
path := gopath.Join(name, p)
3433

35-
path = path.Shift(1)
34+
var value interface{}
35+
var err error
3636

37-
value, err := metadataPlugin.Get(path)
38-
if err != nil {
39-
log.Warn("Cannot metadata cat on plugin", "target", *first, "err", err)
40-
continue
41-
}
42-
if value == nil {
43-
log.Warn("value is nil")
44-
continue
45-
}
37+
if *retry > 0 {
38+
value, err = metadataFunc(path, *retry, *timeout, *errOnTimeout)
39+
} else {
40+
value, err = metadataFunc(path)
41+
}
4642

47-
str := value.String()
43+
result := value
44+
if value, is := value.(*types.Any); is && value != nil {
4845
if s, err := strconv.Unquote(value.String()); err == nil {
49-
str = s
46+
result = s
5047
}
48+
}
5149

52-
err = services.Output(os.Stdout, str, nil)
53-
if err != nil {
54-
return err
55-
}
50+
if result == nil {
51+
result = false
52+
}
53+
54+
err = services.Output(os.Stdout, result, nil)
55+
if err != nil {
56+
return err
5657
}
5758
}
5859
return nil

pkg/plugin/metadata/template/template.go

Lines changed: 158 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package template
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/docker/infrakit/pkg/discovery"
78
metadata_rpc "github.com/docker/infrakit/pkg/rpc/metadata"
@@ -20,64 +21,179 @@ func MetadataFunc(discovery func() discovery.Plugins) func(string, ...interface{
2021

2122
return func(path string, optionalValue ...interface{}) (interface{}, error) {
2223

23-
if plugins == nil {
24-
return nil, fmt.Errorf("no plugin discovery:%s", path)
24+
switch len(optionalValue) {
25+
case 0: // read
26+
return doGetSet(plugins, path, optionalValue...)
27+
case 1: // set
28+
return doGetSet(plugins, path, optionalValue...)
29+
case 2: // a retry time + timeout is specified for a read
30+
retry, err := duration(optionalValue[0])
31+
if err != nil {
32+
return nil, err
33+
}
34+
timeout, err := duration(optionalValue[1])
35+
if err != nil {
36+
return nil, err
37+
}
38+
return doBlockingGet(plugins, path, retry, timeout)
39+
case 3: // a retry time + timeout is specified for a read + bool to return error
40+
retry, err := duration(optionalValue[0])
41+
if err != nil {
42+
return nil, err
43+
}
44+
timeout, err := duration(optionalValue[1])
45+
if err != nil {
46+
return nil, err
47+
}
48+
errOnTimeout, is := optionalValue[2].(bool)
49+
if !is {
50+
return nil, fmt.Errorf("must be boolean %v", optionalValue[2])
51+
}
52+
v, err := doBlockingGet(plugins, path, retry, timeout)
53+
if errOnTimeout {
54+
return v, err
55+
}
56+
return v, nil
2557
}
58+
return template.VoidValue, fmt.Errorf("wrong number of args")
59+
}
60+
}
2661

27-
mpath := types.PathFromString(path)
28-
first := mpath.Index(0)
29-
if first == nil {
30-
return nil, fmt.Errorf("unknown plugin from path: %s", path)
31-
}
62+
func duration(v interface{}) (time.Duration, error) {
63+
switch v := v.(type) {
64+
case time.Duration:
65+
return v, nil
66+
case types.Duration:
67+
return v.Duration(), nil
68+
case []byte:
69+
return time.ParseDuration(string(v))
70+
case string:
71+
return time.ParseDuration(string(v))
72+
case int64:
73+
return time.Duration(int64(v)), nil
74+
case uint:
75+
return time.Duration(int64(v)), nil
76+
case uint64:
77+
return time.Duration(int64(v)), nil
78+
case int:
79+
return time.Duration(int64(v)), nil
80+
}
81+
return 0, fmt.Errorf("cannot convert to duration: %v", v)
82+
}
83+
84+
// blocking get from metadata. block the same go routine / thread until timeout or value is available
85+
func doBlockingGet(plugins func() discovery.Plugins, path string, retry, timeout time.Duration) (interface{}, error) {
86+
87+
if plugins == nil {
88+
return nil, fmt.Errorf("no plugin discovery:%s", path)
89+
}
90+
91+
mpath := types.PathFromString(path)
92+
first := mpath.Index(0)
93+
if first == nil {
94+
return nil, fmt.Errorf("unknown plugin from path: %s", path)
95+
}
96+
97+
lookup, err := plugins().List()
98+
endpoint, has := lookup[*first]
99+
if !has {
100+
return false, nil // Don't return error. Just return false for non-existence
101+
} else if mpath.Len() == 1 {
102+
return true, nil // This is a test for availability of the plugin
103+
}
104+
105+
metadataPlugin, err := metadata_rpc.NewClient(endpoint.Address)
106+
if err != nil {
107+
return nil, fmt.Errorf("cannot connect to plugin: %s", *first)
108+
}
109+
110+
key := mpath.Shift(1)
111+
var value interface{}
112+
113+
expiry := time.Now().Add(timeout)
114+
115+
for i := 0; ; i++ {
32116

33-
lookup, err := plugins().List()
34-
endpoint, has := lookup[*first]
35-
if !has {
36-
return false, nil // Don't return error. Just return false for non-existence
37-
} else if mpath.Len() == 1 {
38-
return true, nil // This is a test for availability of the plugin
117+
any, err := metadataPlugin.Get(key)
118+
if err == nil && any != nil {
119+
err = any.Decode(&value)
120+
if err != nil {
121+
return any.String(), err // note the type changed to string in error return
122+
}
123+
return value, err
39124
}
40125

41-
metadataPlugin, err := metadata_rpc.NewClient(endpoint.Address)
42-
if err != nil {
43-
return nil, fmt.Errorf("cannot connect to plugin: %s", *first)
126+
if i > 0 && time.Now().After(expiry) {
127+
break
44128
}
45129

46-
key := mpath.Shift(1)
47-
var value interface{}
48-
any, err := metadataPlugin.Get(key)
49-
if err != nil {
50-
return nil, err
130+
if retry > 0 {
131+
<-time.After(retry)
51132
}
133+
}
134+
return value, fmt.Errorf("expired waiting")
135+
}
136+
137+
func doGetSet(plugins func() discovery.Plugins, path string, optionalValue ...interface{}) (interface{}, error) {
138+
if plugins == nil {
139+
return nil, fmt.Errorf("no plugin discovery:%s", path)
140+
}
141+
142+
mpath := types.PathFromString(path)
143+
first := mpath.Index(0)
144+
if first == nil {
145+
return nil, fmt.Errorf("unknown plugin from path: %s", path)
146+
}
147+
148+
lookup, err := plugins().List()
149+
endpoint, has := lookup[*first]
150+
if !has {
151+
return false, nil // Don't return error. Just return false for non-existence
152+
} else if mpath.Len() == 1 {
153+
return true, nil // This is a test for availability of the plugin
154+
}
52155

156+
metadataPlugin, err := metadata_rpc.NewClient(endpoint.Address)
157+
if err != nil {
158+
return nil, fmt.Errorf("cannot connect to plugin: %s", *first)
159+
}
160+
161+
key := mpath.Shift(1)
162+
var value interface{}
163+
any, err := metadataPlugin.Get(key)
164+
if err != nil {
165+
return nil, err
166+
}
167+
168+
if any != nil {
53169
err = any.Decode(&value)
54170
if err != nil {
55171
return any.String(), err // note the type changed to string in error return
56172
}
173+
}
57174

58-
// Update case: return value is the version before this successful commit.
59-
if len(optionalValue) == 1 {
175+
// Update case: return value is the version before this successful commit.
176+
if len(optionalValue) == 1 {
60177

61-
any, err := types.AnyValue(optionalValue[0])
62-
if err != nil {
63-
return value, err
64-
}
65-
66-
// update it
67-
updatablePlugin, is := metadataPlugin.(metadata.Updatable)
68-
if !is {
69-
return value, fmt.Errorf("value is read-only")
70-
}
71-
_, proposed, cas, err := updatablePlugin.Changes([]metadata.Change{
72-
{
73-
Path: key,
74-
Value: any,
75-
},
76-
})
77-
err = updatablePlugin.Commit(proposed, cas)
78-
return template.VoidValue, err
178+
any, err := types.AnyValue(optionalValue[0])
179+
if err != nil {
180+
return value, err
79181
}
80182

81-
return value, err
183+
// update it
184+
updatablePlugin, is := metadataPlugin.(metadata.Updatable)
185+
if !is {
186+
return value, fmt.Errorf("value is read-only")
187+
}
188+
_, proposed, cas, err := updatablePlugin.Changes([]metadata.Change{
189+
{
190+
Path: key,
191+
Value: any,
192+
},
193+
})
194+
err = updatablePlugin.Commit(proposed, cas)
195+
return template.VoidValue, err
82196
}
197+
198+
return value, err
83199
}

pkg/plugin/metadata/updatable.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ import (
99
"github.com/imdario/mergo"
1010
)
1111

12-
var log = logutil.New("module", "plugin/metadata/updatable")
12+
var (
13+
log = logutil.New("module", "plugin/metadata/updatable")
14+
debugV = logutil.V(300)
15+
)
1316

1417
// LoadFunc is the function for returning the original to modify
1518
type LoadFunc func() (original *types.Any, err error)
@@ -51,20 +54,20 @@ func (p updatable) Changes(changes []metadata.Change) (original, proposed *types
5154
if err != nil {
5255
return
5356
}
54-
log.Info("original", "original", original.String())
57+
log.Debug("original", "original", original.String(), "V", debugV)
5558

5659
var current map[string]interface{}
5760
if err = original.Decode(&current); err != nil {
5861
return
5962
}
60-
log.Info("decoded", "current", current)
63+
log.Debug("decoded", "current", current, "V", debugV)
6164

6265
changeSet, e := changeSet(changes)
6366
if e != nil {
6467
err = e
6568
return
6669
}
67-
log.Info("changeset", "changeset", changeSet)
70+
log.Debug("changeset", "changeset", changeSet, "V", debugV)
6871

6972
var applied map[string]interface{}
7073
if err = changeSet.Decode(&applied); err != nil {
@@ -75,15 +78,15 @@ func (p updatable) Changes(changes []metadata.Change) (original, proposed *types
7578
return
7679
}
7780

78-
log.Info("decoded2", "applied", applied)
81+
log.Debug("decoded2", "applied", applied, "V", debugV)
7982

8083
// encoded it back to bytes
8184
proposed, err = types.AnyValue(applied)
8285
if err != nil {
8386
return
8487
}
8588

86-
log.Info("proposed", "proposed", proposed.String())
89+
log.Debug("proposed", "proposed", proposed.String(), "V", debugV)
8790

8891
cas = types.Fingerprint(original, proposed)
8992
return

0 commit comments

Comments
 (0)