Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 210a5d1

Browse files
author
David Chung
authored
Etcd - manager support (#439)
Signed-off-by: David Chung <[email protected]>
1 parent 63184c2 commit 210a5d1

File tree

8 files changed

+132
-20
lines changed

8 files changed

+132
-20
lines changed

cmd/manager/etcd.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package main
2+
3+
import (
4+
"time"
5+
6+
"github.com/coreos/etcd/clientv3"
7+
"github.com/docker/go-connections/tlsconfig"
8+
"github.com/docker/infrakit/pkg/discovery"
9+
etcd_leader "github.com/docker/infrakit/pkg/leader/etcd/v3"
10+
etcd_store "github.com/docker/infrakit/pkg/store/etcd/v3"
11+
etcd "github.com/docker/infrakit/pkg/util/etcd/v3"
12+
log "github.com/golang/glog"
13+
"github.com/spf13/cobra"
14+
)
15+
16+
func etcdEnvironment(getConfig func() config) *cobra.Command {
17+
18+
defaultEndpoint := etcd.LocalIP() + ":2379"
19+
20+
cmd := &cobra.Command{
21+
Use: "etcd",
22+
Short: "etcd v3 for leader detection and storage",
23+
}
24+
pollInterval := cmd.Flags().Duration("poll-interval", 5*time.Second, "Leader polling interval")
25+
requestTimeout := cmd.Flags().Duration("request-timeout", 1*time.Second, "Request timeout")
26+
endpoint := cmd.Flags().String("endpoint", defaultEndpoint, "Etcd endpoint (v3 grpc)")
27+
caFile := cmd.Flags().String("tlscacert", "", "TLS CA cert file path")
28+
certFile := cmd.Flags().String("tlscert", "", "TLS cert file path")
29+
tlsKey := cmd.Flags().String("tlskey", "", "TLS key file path")
30+
insecureSkipVerify := cmd.Flags().Bool("tlsverify", true, "True to skip TLS")
31+
cmd.RunE = func(c *cobra.Command, args []string) error {
32+
33+
options := etcd.Options{
34+
Config: clientv3.Config{
35+
Endpoints: []string{*endpoint},
36+
},
37+
RequestTimeout: *requestTimeout,
38+
}
39+
40+
if *caFile != "" && *certFile != "" && *tlsKey != "" {
41+
config, err := tlsconfig.Client(tlsconfig.Options{
42+
CAFile: *caFile,
43+
CertFile: *certFile,
44+
KeyFile: *tlsKey,
45+
InsecureSkipVerify: *insecureSkipVerify,
46+
})
47+
48+
if err != nil {
49+
return err
50+
}
51+
options.Config.TLS = config
52+
}
53+
54+
etcdClient, err := etcd.NewClient(options)
55+
log.Infoln("Connect to etcd3", *endpoint, "err=", err)
56+
if err != nil {
57+
return err
58+
}
59+
defer etcdClient.Close()
60+
61+
// Start the leader and storage backends
62+
63+
leader := etcd_leader.NewDetector(*pollInterval, etcdClient)
64+
snapshot, err := etcd_store.NewSnapshot(etcdClient)
65+
if err != nil {
66+
return err
67+
}
68+
69+
plugins, err := discovery.NewPluginDiscovery()
70+
if err != nil {
71+
return err
72+
}
73+
74+
cfg := getConfig()
75+
cfg.plugins = plugins
76+
cfg.leader = leader
77+
cfg.snapshot = snapshot
78+
79+
return runMain(cfg)
80+
}
81+
82+
return cmd
83+
}

cmd/manager/main.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
goflag "flag"
45
"os"
56
"path/filepath"
67
"time"
@@ -20,6 +21,7 @@ import (
2021
)
2122

2223
func init() {
24+
2325
cli.RegisterInfo("manager - swarm option",
2426
map[string]interface{}{
2527
"DockerClientAPIVersion": docker.ClientVersion,
@@ -58,7 +60,15 @@ func main() {
5860
}
5961
}
6062

61-
cmd.AddCommand(cli.VersionCommand(), osEnvironment(buildConfig), swarmEnvironment(buildConfig))
63+
cmd.AddCommand(cli.VersionCommand(),
64+
osEnvironment(buildConfig),
65+
swarmEnvironment(buildConfig),
66+
etcdEnvironment(buildConfig),
67+
)
68+
69+
// glog flag compatibility
70+
cmd.PersistentFlags().AddGoFlagSet(goflag.CommandLine)
71+
goflag.CommandLine.Parse([]string{})
6272

6373
err := cmd.Execute()
6474
if err != nil {

cmd/manager/swarm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func swarmEnvironment(getConfig func() config) *cobra.Command {
2626
insecureSkipVerify := cmd.Flags().Bool("tlsverify", true, "True to skip TLS")
2727
cmd.RunE = func(c *cobra.Command, args []string) error {
2828

29-
dockerClient, err := docker.NewDockerClient(*host, &tlsconfig.Options{
29+
dockerClient, err := docker.NewClient(*host, &tlsconfig.Options{
3030
CAFile: *caFile,
3131
CertFile: *certFile,
3232
KeyFile: *tlsKey,

examples/flavor/swarm/flavor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func DockerClient(spec Spec) (docker.APIClientCloser, error) {
5757
tls = &tlsconfig.Options{}
5858
}
5959

60-
return docker.NewDockerClient(spec.Docker.Host, tls)
60+
return docker.NewClient(spec.Docker.Host, tls)
6161
}
6262

6363
// baseFlavor is the base implementation. The manager / worker implementations will provide override.

pkg/leader/etcd/v3/etcd.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"time"
66

7+
"github.com/coreos/etcd/clientv3"
78
"github.com/docker/infrakit/pkg/leader"
89
"github.com/docker/infrakit/pkg/util/etcd/v3"
910
log "github.com/golang/glog"
@@ -18,25 +19,35 @@ func NewDetector(pollInterval time.Duration, client *etcd.Client) leader.Detecto
1819
}
1920

2021
// AmILeader checks if this node is a leader
21-
func AmILeader(ctx context.Context, client *etcd.Client) (bool, error) {
22+
func AmILeader(ctx context.Context, client *etcd.Client) (isLeader bool, err error) {
2223

23-
// get status of node
2424
endpoint := ""
25+
var statusResp *clientv3.StatusResponse
26+
27+
defer func() {
28+
log.V(100).Infoln("checking status at", endpoint, "resp=", statusResp, "err=", err, "leader=", isLeader)
29+
}()
30+
31+
// get status of node
2532
if len(client.Options.Config.Endpoints) > 0 {
2633
endpoint = client.Options.Config.Endpoints[0]
2734
}
2835

2936
if endpoint == "" {
30-
return false, fmt.Errorf("bad config:%v", client.Options)
37+
isLeader = false
38+
err = fmt.Errorf("bad config:%v", client.Options)
39+
return
3140
}
3241

33-
statusResp, err := client.Client.Status(ctx, endpoint)
34-
log.V(50).Infoln("checking status at", endpoint, "resp=", statusResp, "err=", err)
42+
statusResp, err = client.Client.Status(ctx, endpoint)
3543
if err != nil {
36-
return false, err
44+
isLeader = false
45+
return
3746
}
3847

3948
// The header has the self, assuming the endpoint is the self node.
4049
// The response has the id of the leader. So just compare self id and the leader id.
41-
return statusResp.Leader == statusResp.Header.MemberId, nil
50+
isLeader = statusResp.Leader == statusResp.Header.MemberId
51+
52+
return
4253
}

pkg/store/etcd/v3/etcd.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,10 @@ const (
1515
DefaultKey = "infrakit/configs/groups.json"
1616
)
1717

18-
// NewSnapshot returns a snapshot given the options
19-
func NewSnapshot(options etcd.Options) (store.Snapshot, error) {
20-
cli, err := etcd.NewClient(options)
21-
if err != nil {
22-
return nil, err
23-
}
18+
// NewSnapshot returns a snapshot given the client
19+
func NewSnapshot(client *etcd.Client) (store.Snapshot, error) {
2420
return &snapshot{
25-
client: cli,
21+
client: client,
2622
key: DefaultKey,
2723
}, nil
2824
}
@@ -78,11 +74,21 @@ func (s *snapshot) Load(output interface{}) error {
7874
}
7975
}
8076

77+
if resp == nil {
78+
log.Warningln("response is nil. server down?")
79+
return nil
80+
}
81+
8182
if resp.Count > 1 {
8283
log.Warningf("more than 1 config %v", resp)
8384
return nil
8485
}
8586

87+
if resp.Count == 0 {
88+
// no data. therefore no effect on the input
89+
return nil
90+
}
91+
8692
pair := resp.Kvs[0]
8793
any := types.AnyBytes(pair.Value)
8894
return any.Decode(&output)

pkg/store/etcd/v3/etcd_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ func testSaveLoad(t *testing.T) {
5454
RequestTimeout: 1 * time.Second,
5555
}
5656

57-
snap, err := NewSnapshot(options)
57+
etcdClient, err := etcd.NewClient(options)
58+
require.NoError(t, err)
59+
snap, err := NewSnapshot(etcdClient)
5860
require.NoError(t, err)
5961

6062
defer snap.Close()

pkg/util/docker/docker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ type APIClientCloser interface {
2626
client.CommonAPIClient
2727
}
2828

29-
// NewDockerClient creates a new API client.
30-
func NewDockerClient(host string, tls *tlsconfig.Options) (APIClientCloser, error) {
29+
// NewClient creates a new API client.
30+
func NewClient(host string, tls *tlsconfig.Options) (APIClientCloser, error) {
3131
tlsOptions := tls
3232
if tls.KeyFile == "" || tls.CAFile == "" || tls.CertFile == "" {
3333
// The api doesn't like it when you pass in not nil but with zero field values...

0 commit comments

Comments
 (0)