Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit c569045

Browse files
author
David Chung
authored
Simple tcp to socket multiplexing reverse proxy (#438)
Signed-off-by: David Chung <[email protected]>
1 parent 8582433 commit c569045

File tree

17 files changed

+1082
-68
lines changed

17 files changed

+1082
-68
lines changed

Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ endif
126126
$(call build_binary,infrakit-instance-maas,github.com/docker/infrakit/examples/instance/maas)
127127
$(call build_binary,infrakit-event-time,github.com/docker/infrakit/examples/event/time)
128128

129+
cli: build-cli
130+
build-cli:
131+
@echo "+ $@"
132+
ifneq (,$(findstring .m,$(VERSION)))
133+
@echo "\nWARNING - repository contains uncommitted changes, tagging binaries as dirty\n"
134+
endif
135+
$(call build_binary,infrakit,github.com/docker/infrakit/cmd/cli)
136+
129137
install:
130138
@echo "+ $@"
131139
@go install ${GO_LDFLAGS} $(PKGS)

cmd/cli/main.go

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,73 @@ package main
22

33
import (
44
"errors"
5+
"flag"
6+
"net/url"
57
"os"
68

7-
log "github.com/Sirupsen/logrus"
89
"github.com/docker/infrakit/pkg/cli"
910
"github.com/docker/infrakit/pkg/discovery"
11+
"github.com/docker/infrakit/pkg/discovery/remote"
12+
logutil "github.com/docker/infrakit/pkg/log"
1013
"github.com/spf13/cobra"
1114
)
1215

1316
// A generic client for infrakit
14-
1517
func main() {
1618

19+
log := logutil.New("module", "main")
20+
1721
cmd := &cobra.Command{
1822
Use: os.Args[0],
1923
Short: "infrakit cli",
2024
}
21-
logLevel := cmd.PersistentFlags().Int("log", cli.DefaultLogLevel, "Logging level. 0 is least verbose. Max is 5")
25+
26+
// Log setup
27+
logOptions := &logutil.ProdDefaults
28+
ulist := []*url.URL{}
29+
remotes := []string{}
30+
31+
cmd.PersistentFlags().AddFlagSet(cli.Flags(logOptions))
32+
cmd.PersistentFlags().AddGoFlagSet(flag.CommandLine)
33+
34+
cmd.PersistentFlags().StringSliceVarP(&remotes, "host", "H", remotes, "host list. Default is local sockets")
35+
36+
// parse the list of hosts
2237
cmd.PersistentPreRunE = func(c *cobra.Command, args []string) error {
23-
cli.SetLogLevel(*logLevel)
38+
logutil.Configure(logOptions)
39+
40+
if len(remotes) > 0 {
41+
for _, h := range remotes {
42+
u, err := url.Parse(h)
43+
if err != nil {
44+
return err
45+
}
46+
ulist = append(ulist, u)
47+
}
48+
}
2449
return nil
2550
}
2651

27-
// Don't print usage text for any error returned from a RunE function. Only print it when explicitly requested.
52+
// Don't print usage text for any error returned from a RunE function.
53+
// Only print it when explicitly requested.
2854
cmd.SilenceUsage = true
2955

30-
// Don't automatically print errors returned from a RunE function. They are returned from cmd.Execute() below
31-
// and we print it ourselves.
56+
// Don't automatically print errors returned from a RunE function.
57+
// They are returned from cmd.Execute() below and we print it ourselves.
3258
cmd.SilenceErrors = true
3359
f := func() discovery.Plugins {
34-
d, err := discovery.NewPluginDiscovery()
60+
if len(ulist) == 0 {
61+
d, err := discovery.NewPluginDiscovery()
62+
if err != nil {
63+
log.Crit("Failed to initialize plugin discovery", "err", err)
64+
os.Exit(1)
65+
}
66+
return d
67+
}
68+
69+
d, err := remote.NewPluginDiscovery(ulist)
3570
if err != nil {
36-
log.Fatalf("Failed to initialize plugin discovery: %s", err)
71+
log.Crit("Failed to initialize remote plugin discovery", "err", err)
3772
os.Exit(1)
3873
}
3974
return d
@@ -46,19 +81,20 @@ func main() {
4681
cmd.AddCommand(metadataCommand(f))
4782
cmd.AddCommand(eventCommand(f))
4883
cmd.AddCommand(pluginCommand(f))
84+
cmd.AddCommand(utilCommand(f))
4985

5086
cmd.AddCommand(instancePluginCommand(f), groupPluginCommand(f), flavorPluginCommand(f), resourcePluginCommand(f))
5187

5288
err := cmd.Execute()
5389
if err != nil {
54-
log.Fatal(err)
90+
log.Crit("error executing", "err", err)
5591
os.Exit(1)
5692
}
5793
}
5894

5995
func assertNotNil(message string, f interface{}) {
6096
if f == nil {
61-
log.Error(errors.New(message))
97+
logutil.New("main", "assert").Error("assert failed", "err", errors.New(message))
6298
os.Exit(1)
6399
}
64100
}

cmd/cli/util.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package main
2+
3+
import (
4+
"time"
5+
6+
"github.com/docker/infrakit/pkg/discovery"
7+
"github.com/docker/infrakit/pkg/log"
8+
"github.com/docker/infrakit/pkg/rpc/mux"
9+
"github.com/spf13/cobra"
10+
)
11+
12+
var logger = log.New("module", "cli/util")
13+
14+
func muxCommand(plugins func() discovery.Plugins) *cobra.Command {
15+
cmd := &cobra.Command{
16+
Use: "mux",
17+
Short: "API mux service",
18+
}
19+
20+
listen := cmd.Flags().StringP("listen", "l", ":8080", "Listening port")
21+
autoStop := cmd.Flags().BoolP("auto-stop", "a", false, "True to stop when no plugins are running")
22+
interval := cmd.Flags().DurationP("scan", "s", 1*time.Minute, "Scan interval to check for plugins")
23+
24+
cmd.RunE = func(c *cobra.Command, args []string) error {
25+
logger.Info("Starting mux server", "listen", *listen)
26+
server, err := mux.NewServer(*listen, plugins)
27+
if err != nil {
28+
return err
29+
}
30+
defer server.Stop()
31+
32+
block := make(chan struct{})
33+
// If the sockets are gone, then we can safely exit.
34+
go func() {
35+
checkNow := time.Tick(*interval)
36+
for {
37+
select {
38+
case <-server.Wait():
39+
logger.Info("server stopped")
40+
close(block)
41+
return
42+
43+
case <-checkNow:
44+
if m, err := plugins().List(); err == nil {
45+
if len(m) == 0 && *autoStop {
46+
logger.Info("scan found no plugins.")
47+
close(block)
48+
return
49+
}
50+
}
51+
52+
}
53+
}
54+
}()
55+
56+
<-block
57+
58+
return nil
59+
}
60+
61+
return cmd
62+
}
63+
64+
func utilCommand(plugins func() discovery.Plugins) *cobra.Command {
65+
66+
util := &cobra.Command{
67+
Use: "util",
68+
Short: "Utilties",
69+
}
70+
71+
util.AddCommand(muxCommand(plugins))
72+
73+
return util
74+
}

pkg/discovery/dir.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,51 @@ import (
44
"fmt"
55
"io/ioutil"
66
"os"
7+
"os/user"
8+
"path"
79
"path/filepath"
810
"sync"
911

1012
log "github.com/Sirupsen/logrus"
1113
"github.com/docker/infrakit/pkg/plugin"
1214
)
1315

14-
type errNotUnixSocket string
16+
// Dir returns the directory to use for plugin discovery, which may be customized by the environment.
17+
func Dir() string {
18+
if pluginDir := os.Getenv(PluginDirEnvVar); pluginDir != "" {
19+
return pluginDir
20+
}
1521

16-
func (e errNotUnixSocket) Error() string {
17-
return string(e)
22+
home := os.Getenv("HOME")
23+
if usr, err := user.Current(); err == nil {
24+
home = usr.HomeDir
25+
}
26+
return path.Join(home, ".infrakit/plugins")
1827
}
1928

20-
// IsErrNotUnixSocket returns true if the error is due to the file not being a valid unix socket.
21-
func IsErrNotUnixSocket(e error) bool {
22-
_, is := e.(errNotUnixSocket)
23-
return is
29+
// NewPluginDiscovery creates a plugin discovery based on the environment configuration.
30+
func NewPluginDiscovery() (Plugins, error) {
31+
return NewPluginDiscoveryWithDirectory(Dir())
32+
}
33+
34+
// NewPluginDiscoveryWithDirectory creates a plugin discovery based on the directory given.
35+
func NewPluginDiscoveryWithDirectory(pluginDir string) (Plugins, error) {
36+
stat, err := os.Stat(pluginDir)
37+
if err == nil {
38+
if !stat.IsDir() {
39+
return nil, fmt.Errorf("Plugin dir %s is a file", pluginDir)
40+
}
41+
} else {
42+
if os.IsNotExist(err) {
43+
if err := os.MkdirAll(pluginDir, 0700); err != nil {
44+
return nil, fmt.Errorf("Failed to create plugin dir %s: %s", pluginDir, err)
45+
}
46+
} else {
47+
return nil, fmt.Errorf("Failed to access plugin dir %s: %s", pluginDir, err)
48+
}
49+
}
50+
51+
return newDirPluginDiscovery(pluginDir)
2452
}
2553

2654
type dirPluginDiscovery struct {
@@ -38,7 +66,7 @@ func (r *dirPluginDiscovery) Find(name plugin.Name) (*plugin.Endpoint, error) {
3866

3967
p, exists := plugins[lookup]
4068
if !exists {
41-
return nil, fmt.Errorf("Plugin not found: %s (looked up using %s)", name, lookup)
69+
return nil, ErrNotFound(string(name))
4270
}
4371

4472
return p, nil
@@ -54,16 +82,16 @@ func newDirPluginDiscovery(dir string) (*dirPluginDiscovery, error) {
5482
}
5583

5684
func (r *dirPluginDiscovery) dirLookup(entry os.FileInfo) (*plugin.Endpoint, error) {
85+
socketPath := filepath.Join(r.dir, entry.Name())
5786
if entry.Mode()&os.ModeSocket != 0 {
58-
socketPath := filepath.Join(r.dir, entry.Name())
5987
return &plugin.Endpoint{
6088
Protocol: "unix",
6189
Address: socketPath,
6290
Name: entry.Name(),
6391
}, nil
6492
}
6593

66-
return nil, errNotUnixSocket(fmt.Sprintf("File is not a socket: %s", entry))
94+
return nil, ErrNotUnixSocket(socketPath)
6795
}
6896

6997
// List returns a list of plugins known, keyed by the name

pkg/discovery/dir_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
func TestErrNotUnixSocket(t *testing.T) {
17-
err := errNotUnixSocket("no socket!")
17+
err := ErrNotUnixSocket("no socket!")
1818
require.Error(t, err)
1919
require.True(t, IsErrNotUnixSocket(err))
2020
}

pkg/discovery/discovery.go

Lines changed: 20 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ package discovery
22

33
import (
44
"fmt"
5-
"os"
6-
"os/user"
7-
"path"
85

96
"github.com/docker/infrakit/pkg/plugin"
107
)
@@ -21,40 +18,28 @@ const (
2118
PluginDirEnvVar = "INFRAKIT_PLUGINS_DIR"
2219
)
2320

24-
// Dir returns the directory to use for plugin discovery, which may be customized by the environment.
25-
func Dir() string {
26-
if pluginDir := os.Getenv(PluginDirEnvVar); pluginDir != "" {
27-
return pluginDir
28-
}
29-
30-
home := os.Getenv("HOME")
31-
if usr, err := user.Current(); err == nil {
32-
home = usr.HomeDir
33-
}
34-
return path.Join(home, ".infrakit/plugins")
21+
// ErrNotUnixSocket is the error raised when the file is not a unix socket
22+
type ErrNotUnixSocket string
23+
24+
func (e ErrNotUnixSocket) Error() string {
25+
return fmt.Sprintf("not a unix socket:%s", string(e))
26+
}
27+
28+
// IsErrNotUnixSocket returns true if the error is due to the file not being a valid unix socket.
29+
func IsErrNotUnixSocket(e error) bool {
30+
_, is := e.(ErrNotUnixSocket)
31+
return is
3532
}
3633

37-
// NewPluginDiscovery creates a plugin discovery based on the environment configuration.
38-
func NewPluginDiscovery() (Plugins, error) {
39-
return NewPluginDiscoveryWithDirectory(Dir())
34+
// ErrNotFound is the error raised when the plugin is not found
35+
type ErrNotFound string
36+
37+
func (e ErrNotFound) Error() string {
38+
return fmt.Sprintf("plugin not found:%s", string(e))
4039
}
4140

42-
// NewPluginDiscoveryWithDirectory creates a plugin discovery based on the directory given.
43-
func NewPluginDiscoveryWithDirectory(pluginDir string) (Plugins, error) {
44-
stat, err := os.Stat(pluginDir)
45-
if err == nil {
46-
if !stat.IsDir() {
47-
return nil, fmt.Errorf("Plugin dir %s is a file", pluginDir)
48-
}
49-
} else {
50-
if os.IsNotExist(err) {
51-
if err := os.MkdirAll(pluginDir, 0700); err != nil {
52-
return nil, fmt.Errorf("Failed to create plugin dir %s: %s", pluginDir, err)
53-
}
54-
} else {
55-
return nil, fmt.Errorf("Failed to access plugin dir %s: %s", pluginDir, err)
56-
}
57-
}
58-
59-
return newDirPluginDiscovery(pluginDir)
41+
// IsErrNotFound returns true if the error is due to a plugin not found.
42+
func IsErrNotFound(e error) bool {
43+
_, is := e.(ErrNotFound)
44+
return is
6045
}

0 commit comments

Comments
 (0)