Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 108a1ba

Browse files
author
David Chung
authored
Persistent / replicated metadata, util init fixes, and documentation (#730)
Signed-off-by: David Chung <[email protected]>
1 parent 0b7cab4 commit 108a1ba

File tree

32 files changed

+1459
-315
lines changed

32 files changed

+1459
-315
lines changed

cmd/infrakit/manager/manager.go

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ import (
77
"strings"
88

99
"github.com/docker/infrakit/cmd/infrakit/base"
10+
"github.com/docker/infrakit/cmd/infrakit/manager/schema"
11+
1012
"github.com/docker/infrakit/pkg/cli"
1113
"github.com/docker/infrakit/pkg/discovery"
1214
logutil "github.com/docker/infrakit/pkg/log"
1315
"github.com/docker/infrakit/pkg/manager"
1416
"github.com/docker/infrakit/pkg/plugin"
17+
group_types "github.com/docker/infrakit/pkg/plugin/group/types"
1518
"github.com/docker/infrakit/pkg/rpc/client"
1619
group_rpc "github.com/docker/infrakit/pkg/rpc/group"
1720
manager_rpc "github.com/docker/infrakit/pkg/rpc/manager"
@@ -113,54 +116,36 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
113116
return err
114117
}
115118

116-
// In any case, the view should be in JSON format
117-
118-
// Treat this as an Any and then convert
119-
any := types.AnyString(view)
120-
121-
groups := []plugin.Spec{}
122-
err = any.Decode(&groups)
123-
if err != nil {
124-
log.Warn("Error parsing the template for plugin specs.")
125-
return err
126-
}
127-
128-
// Check the list of plugins
129-
for _, gp := range groups {
130-
131-
endpoint, err := plugins().Find(gp.Plugin)
119+
commitEachGroup := func(name plugin.Name, gid group.ID, gspec group_types.Spec) error {
120+
endpoint, err := plugins().Find(name)
132121
if err != nil {
133122
return err
134123
}
135-
136-
// unmarshal the group spec
137-
spec := group.Spec{}
138-
if gp.Properties != nil {
139-
err = gp.Properties.Decode(&spec)
140-
if err != nil {
141-
return err
142-
}
143-
}
144-
145-
// TODO(chungers) -- we need to enforce and confirm the type of this.
146-
// Right now we assume the RPC endpoint is indeed a group.
147124
target, err := group_rpc.NewClient(endpoint.Address)
125+
log.Debug("commit", "plugin", name, "address", endpoint.Address, "err", err, "gspec", gspec)
148126

149-
log.Debug("commit", "plugin", gp.Plugin, "address", endpoint.Address, "err", err, "spec", spec)
127+
if err != nil {
128+
return err
129+
}
150130

131+
any, err := types.AnyValue(gspec)
151132
if err != nil {
152133
return err
153134
}
154135

155-
plan, err := target.CommitGroup(spec, *pretend)
136+
plan, err := target.CommitGroup(group.Spec{
137+
ID: gid,
138+
Properties: any,
139+
}, *pretend)
140+
156141
if err != nil {
157142
return err
158143
}
159144

160-
fmt.Println("Group", spec.ID, "with plugin", gp.Plugin, "plan:", plan)
145+
fmt.Println("Group", gid, "with plugin", name, "plan:", plan)
146+
return nil
161147
}
162-
163-
return nil
148+
return schema.ParseInputSpecs([]byte(view), commitEachGroup)
164149
},
165150
}
166151
commit.Flags().AddFlagSet(templateFlags)
@@ -357,7 +342,7 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
357342
}
358343
change.AddCommand(changeList, changeGet)
359344

360-
cmd.AddCommand(commit, inspect, change, leader)
345+
cmd.AddCommand(commit, inspect, leader)
361346

362347
return cmd
363348
}

cmd/infrakit/manager/schema/v0.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package schema
2+
3+
import (
4+
"github.com/docker/infrakit/pkg/plugin"
5+
group_types "github.com/docker/infrakit/pkg/plugin/group/types"
6+
"github.com/docker/infrakit/pkg/spi/group"
7+
"github.com/docker/infrakit/pkg/types"
8+
)
9+
10+
// ParseInputSpecs parses the input bytes which is the groups.json, and calls
11+
// each time a group spec is found.
12+
func ParseInputSpecs(input []byte, foundGroupSpec func(plugin.Name, group.ID, group_types.Spec) error) error {
13+
// TODO - update the schema soon. This is the Plugin/Properties schema
14+
type spec struct {
15+
Plugin plugin.Name
16+
Properties struct {
17+
ID group.ID
18+
Properties group_types.Spec
19+
}
20+
}
21+
22+
specs := []spec{}
23+
err := types.AnyBytes(input).Decode(&specs)
24+
if err != nil {
25+
return err
26+
}
27+
for _, s := range specs {
28+
err = foundGroupSpec(s.Plugin, s.Properties.ID, s.Properties.Properties)
29+
if err != nil {
30+
return err
31+
}
32+
}
33+
return nil
34+
}

cmd/infrakit/plugin/plugin.go

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"time"
88

99
"github.com/docker/infrakit/cmd/infrakit/base"
10+
11+
"github.com/docker/infrakit/pkg/cli"
1012
"github.com/docker/infrakit/pkg/discovery"
1113
"github.com/docker/infrakit/pkg/launch"
1214
logutil "github.com/docker/infrakit/pkg/log"
@@ -16,7 +18,6 @@ import (
1618
"github.com/docker/infrakit/pkg/run/manager"
1719
group_kind "github.com/docker/infrakit/pkg/run/v0/group"
1820
manager_kind "github.com/docker/infrakit/pkg/run/v0/manager"
19-
"github.com/docker/infrakit/pkg/types"
2021
"github.com/spf13/cobra"
2122
)
2223

@@ -139,42 +140,22 @@ func Command(plugins func() discovery.Plugins) *cobra.Command {
139140
}
140141

141142
configURL := start.Flags().String("config-url", "", "URL for the startup configs")
142-
mustAll := start.Flags().Bool("all", false, "Panic if any plugin fails to start")
143-
templateFlags, toJSON, _, processTemplate := base.TemplateProcessor(plugins)
144-
start.Flags().AddFlagSet(templateFlags)
143+
144+
services := cli.NewServices(plugins)
145+
start.Flags().AddFlagSet(services.ProcessTemplateFlags)
145146

146147
start.RunE = func(c *cobra.Command, args []string) error {
147148

148149
if plugins == nil {
149150
panic("no plugins()")
150151
}
151152

152-
parsedRules := []launch.Rule{}
153-
154153
log.Info("config", "url", *configURL)
155-
156-
if *configURL != "" {
157-
buff, err := processTemplate(*configURL)
158-
if err != nil {
159-
return err
160-
}
161-
162-
view, err := toJSON([]byte(buff))
163-
if err != nil {
164-
return err
165-
}
166-
167-
configs := types.AnyBytes(view)
168-
err = configs.Decode(&parsedRules)
169-
if err != nil {
170-
return err
171-
}
172-
}
173-
174-
pluginManager, err := manager.ManagePlugins(parsedRules, plugins, *mustAll, 5*time.Second)
154+
pluginManager, err := cli.PluginManager(plugins, services, *configURL)
175155
if err != nil {
176156
return err
177157
}
158+
178159
defer func() {
179160
if r := recover(); r != nil {
180161
log.Error("Error occurred. Recovered but exiting.", "err", r)

0 commit comments

Comments
 (0)