Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 26dbe34

Browse files
author
David Chung
authored
Support plugins that listen on network port (#488)
Signed-off-by: David Chung <[email protected]>
1 parent eb8aa70 commit 26dbe34

File tree

9 files changed

+181
-34
lines changed

9 files changed

+181
-34
lines changed

pkg/cli/serverutil.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,47 @@ func EnsureDirExists(dir string) {
1919
// RunPlugin runs a plugin server, advertising with the provided name for discovery.
2020
// The plugin should conform to the rpc call convention as implemented in the rpc package.
2121
func RunPlugin(name string, plugin server.VersionedInterface, more ...server.VersionedInterface) {
22-
2322
dir := local.Dir()
2423
EnsureDirExists(dir)
2524

2625
socketPath := path.Join(dir, name)
2726
pidPath := path.Join(dir, name+".pid")
27+
run("", socketPath, pidPath, plugin, more...)
28+
}
2829

29-
stoppable, err := server.StartPluginAtPath(socketPath, plugin, more...)
30-
if err != nil {
31-
logrus.Error(err)
30+
// RunListener runs a plugin server, listening at listen address, and
31+
// advertising with the provided name for discovery.
32+
// The plugin should conform to the rpc call convention as implemented in the rpc package.
33+
func RunListener(listen, name string, plugin server.VersionedInterface, more ...server.VersionedInterface) {
34+
dir := local.Dir()
35+
EnsureDirExists(dir)
36+
37+
discoverPath := path.Join(dir, name+".listen")
38+
pidPath := path.Join(dir, name+".pid")
39+
run(listen, discoverPath, pidPath, plugin, more...)
40+
}
41+
42+
func run(listen, discoverPath, pidPath string,
43+
plugin server.VersionedInterface, more ...server.VersionedInterface) {
44+
45+
var stoppable server.Stoppable
46+
47+
if listen != "" {
48+
s, err := server.StartListenerAtPath(listen, discoverPath, plugin, more...)
49+
if err != nil {
50+
logrus.Error(err)
51+
}
52+
stoppable = s
53+
} else {
54+
s, err := server.StartPluginAtPath(discoverPath, plugin, more...)
55+
if err != nil {
56+
logrus.Error(err)
57+
}
58+
stoppable = s
3259
}
3360

3461
// write PID file
35-
err = ioutil.WriteFile(pidPath, []byte(fmt.Sprintf("%v", os.Getpid())), 0644)
62+
err := ioutil.WriteFile(pidPath, []byte(fmt.Sprintf("%v", os.Getpid())), 0644)
3663
if err != nil {
3764
logrus.Error(err)
3865
}
@@ -44,4 +71,7 @@ func RunPlugin(name string, plugin server.VersionedInterface, more ...server.Ver
4471
// clean up
4572
os.Remove(pidPath)
4673
logrus.Infoln("Removed PID file at", pidPath)
74+
75+
os.Remove(discoverPath)
76+
logrus.Infoln("Removed discover file at", discoverPath)
4777
}

pkg/discovery/discovery.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@ const (
1818
PluginDirEnvVar = "INFRAKIT_PLUGINS_DIR"
1919
)
2020

21-
// ErrNotUnixSocket is the error raised when the file is not a unix socket
22-
type ErrNotUnixSocket string
21+
// ErrNotUnixSocketOrListener is the error raised when the file is not a unix socket
22+
type ErrNotUnixSocketOrListener string
2323

24-
func (e ErrNotUnixSocket) Error() string {
25-
return fmt.Sprintf("not a unix socket:%s", string(e))
24+
func (e ErrNotUnixSocketOrListener) Error() string {
25+
return fmt.Sprintf("not a unix socket or listener:%s", string(e))
2626
}
2727

28-
// IsErrNotUnixSocket returns true if the error is due to the file not being a valid unix socket.
29-
func IsErrNotUnixSocket(e error) bool {
30-
_, is := e.(ErrNotUnixSocket)
28+
// IsErrNotUnixSocketOrListener returns true if the error is due to the file not being a valid unix socket.
29+
func IsErrNotUnixSocketOrListener(e error) bool {
30+
_, is := e.(ErrNotUnixSocketOrListener)
3131
return is
3232
}
3333

pkg/discovery/discovery_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"github.com/stretchr/testify/require"
77
)
88

9-
func TestErrNotUnixSocket(t *testing.T) {
10-
err := ErrNotUnixSocket("no socket!")
9+
func TestErrNotUnixSocketOrListener(t *testing.T) {
10+
err := ErrNotUnixSocketOrListener("no socket!")
1111
require.Error(t, err)
12-
require.True(t, IsErrNotUnixSocket(err))
12+
require.True(t, IsErrNotUnixSocketOrListener(err))
1313
}

pkg/discovery/local/dir.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package local
33
import (
44
"fmt"
55
"io/ioutil"
6+
"net/url"
67
"os"
78
"os/user"
89
"path/filepath"
10+
"strings"
911
"sync"
1012

1113
"github.com/docker/infrakit/pkg/discovery"
@@ -81,16 +83,32 @@ func newDirPluginDiscovery(dir string) (*dirPluginDiscovery, error) {
8183
}
8284

8385
func (r *dirPluginDiscovery) dirLookup(entry os.FileInfo) (*plugin.Endpoint, error) {
84-
socketPath := filepath.Join(r.dir, entry.Name())
85-
if entry.Mode()&os.ModeSocket != 0 {
86+
path := filepath.Join(r.dir, entry.Name())
87+
88+
switch {
89+
90+
case entry.Mode()&os.ModeSocket != 0:
8691
return &plugin.Endpoint{
8792
Protocol: "unix",
88-
Address: socketPath,
93+
Address: path,
8994
Name: entry.Name(),
9095
}, nil
96+
97+
case entry.Mode()&os.ModeType == 0 && filepath.Ext(path) == ".listen":
98+
if buff, err := ioutil.ReadFile(path); err == nil {
99+
// content should be a url
100+
i := strings.Index(entry.Name(), ".listen")
101+
if u, err := url.Parse(string(buff)); err == nil {
102+
return &plugin.Endpoint{
103+
Protocol: u.Scheme,
104+
Address: u.String(),
105+
Name: entry.Name()[0:i],
106+
}, nil
107+
}
108+
}
91109
}
92110

93-
return nil, discovery.ErrNotUnixSocket(socketPath)
111+
return nil, discovery.ErrNotUnixSocketOrListener(path)
94112
}
95113

96114
// List returns a list of plugins known, keyed by the name
@@ -112,7 +130,7 @@ func (r *dirPluginDiscovery) List() (map[string]*plugin.Endpoint, error) {
112130
instance, err := r.dirLookup(entry)
113131

114132
if err != nil {
115-
if !discovery.IsErrNotUnixSocket(err) {
133+
if !discovery.IsErrNotUnixSocketOrListener(err) {
116134
log.Warn("Err loading plugin", "err", err)
117135
}
118136
continue

pkg/discovery/local/dir_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/docker/infrakit/pkg/plugin"
1111
rpc "github.com/docker/infrakit/pkg/rpc/instance"
1212
"github.com/docker/infrakit/pkg/rpc/server"
13+
. "github.com/docker/infrakit/pkg/testing"
1314
"github.com/stretchr/testify/require"
1415
)
1516

@@ -30,15 +31,17 @@ func TestDirDiscovery(t *testing.T) {
3031
dir, err := ioutil.TempDir("", "infrakit_dir_test")
3132
require.NoError(t, err)
3233

34+
T(100).Infoln("Starting server1")
3335
name1 := "server1"
3436
path1 := filepath.Join(dir, name1)
3537
server1, err := server.StartPluginAtPath(path1, rpc.PluginServer(nil))
3638
require.NoError(t, err)
3739
require.NotNil(t, server1)
3840

41+
T(100).Infoln("Starting server2")
3942
name2 := "server2"
40-
path2 := filepath.Join(dir, name2)
41-
server2, err := server.StartPluginAtPath(path2, rpc.PluginServer(nil))
43+
path2 := filepath.Join(dir, name2+".listen")
44+
server2, err := server.StartListenerAtPath("localhost:7777", path2, rpc.PluginServer(nil))
4245
require.NoError(t, err)
4346
require.NotNil(t, server2)
4447

pkg/rpc/client/client.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package client
33
import (
44
"bytes"
55
"fmt"
6+
"io/ioutil"
67
"net"
78
"net/http"
89
"net/http/httputil"
910
"net/url"
11+
"path"
1012
"sync"
1113

1214
log "github.com/Sirupsen/logrus"
@@ -22,10 +24,6 @@ type client struct {
2224

2325
// New creates a new Client that communicates with a unix socket and validates the remote API.
2426
func New(address string, api spi.InterfaceSpec) (Client, error) {
25-
u, err := url.Parse(address)
26-
if err != nil {
27-
return nil, err
28-
}
2927

3028
u, httpC, err := parseAddress(address)
3129
if err != nil {
@@ -45,6 +43,15 @@ func New(address string, api spi.InterfaceSpec) (Client, error) {
4543
}
4644

4745
func parseAddress(address string) (*url.URL, *http.Client, error) {
46+
47+
if path.Ext(address) == ".listen" {
48+
buff, err := ioutil.ReadFile(address)
49+
if err != nil {
50+
return nil, nil, err
51+
}
52+
address = string(buff)
53+
}
54+
4855
u, err := url.Parse(address)
4956
if err != nil {
5057
return nil, nil, err
@@ -60,7 +67,10 @@ func parseAddress(address string) (*url.URL, *http.Client, error) {
6067
return net.Dial("unix", address)
6168
},
6269
}}, nil
63-
case "http", "https", "tcp":
70+
case "tcp":
71+
u.Scheme = "http"
72+
return u, &http.Client{}, nil
73+
case "http", "https":
6474
return u, &http.Client{}, nil
6575

6676
default:

pkg/rpc/client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestParseAddress(t *testing.T) {
2121
u, c, err = parseAddress("tcp://host:9090/foo/bar/baz")
2222
require.NoError(t, err)
2323
require.Nil(t, c.Transport)
24-
require.Equal(t, "tcp://host:9090/foo/bar/baz", u.String())
24+
require.Equal(t, "http://host:9090/foo/bar/baz", u.String())
2525

2626
u, c, err = parseAddress("https://host:9090")
2727
require.NoError(t, err)

pkg/rpc/server/server.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package server
22

33
import (
44
"fmt"
5+
"io/ioutil"
56
"net"
67
"net/http"
78
"net/http/httptest"
89
"net/http/httputil"
10+
"os"
911
"time"
1012

1113
log "github.com/Sirupsen/logrus"
@@ -76,9 +78,22 @@ type VersionedInterface interface {
7678
ImplementedInterface() spi.InterfaceSpec
7779
}
7880

81+
// StartListenerAtPath starts an HTTP server listening on tcp port with discovery entry at specified path.
82+
// Returns a Stoppable that can be used to stop or block on the server.
83+
func StartListenerAtPath(listen, discoverPath string,
84+
receiver VersionedInterface, more ...VersionedInterface) (Stoppable, error) {
85+
return startAtPath(listen, discoverPath, receiver, more...)
86+
}
87+
7988
// StartPluginAtPath starts an HTTP server listening on a unix socket at the specified path.
8089
// Returns a Stoppable that can be used to stop or block on the server.
8190
func StartPluginAtPath(socketPath string, receiver VersionedInterface, more ...VersionedInterface) (Stoppable, error) {
91+
return startAtPath("", socketPath, receiver, more...)
92+
}
93+
94+
func startAtPath(listen, discoverPath string,
95+
receiver VersionedInterface, more ...VersionedInterface) (Stoppable, error) {
96+
8297
server := rpc.NewServer()
8398
server.RegisterCodec(json2.NewCodec(), "application/json")
8499

@@ -160,22 +175,50 @@ func StartPluginAtPath(socketPath string, receiver VersionedInterface, more ...V
160175

161176
gracefulServer := graceful.Server{
162177
Timeout: 10 * time.Second,
163-
Server: &http.Server{Addr: fmt.Sprintf("unix://%s", socketPath), Handler: router},
164178
}
165179

166-
listener, err := net.Listen("unix", socketPath)
167-
if err != nil {
168-
return nil, err
169-
}
180+
var listener net.Listener
181+
182+
if listen != "" {
183+
gracefulServer.Server = &http.Server{
184+
Addr: listen,
185+
Handler: router,
186+
}
187+
l, err := net.Listen("tcp", listen)
188+
if err != nil {
189+
return nil, err
190+
}
191+
listener = l
192+
193+
if err := ioutil.WriteFile(discoverPath, []byte(fmt.Sprintf("tcp://%s", listen)), 0644); err != nil {
194+
return nil, err
195+
}
170196

171-
log.Infof("Listening at: %s", socketPath)
197+
log.Infof("Listening at: %s, discoverable at %s", listen, discoverPath)
198+
199+
} else {
200+
gracefulServer.Server = &http.Server{
201+
Addr: fmt.Sprintf("unix://%s", discoverPath),
202+
Handler: router,
203+
}
204+
l, err := net.Listen("unix", discoverPath)
205+
if err != nil {
206+
return nil, err
207+
}
208+
listener = l
209+
log.Infof("Listening at: %s", discoverPath)
210+
211+
}
172212

173213
go func() {
174214
err := gracefulServer.Serve(listener)
175215
if err != nil {
176216
log.Warn(err)
177217
}
178218
events.Stop()
219+
if listen != "" {
220+
os.Remove(discoverPath)
221+
}
179222
}()
180223

181224
return &stoppableServer{server: &gracefulServer}, nil

pkg/rpc/server/server_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,46 @@ func TestUnixSocketServer(t *testing.T) {
5959

6060
server.Stop()
6161
}
62+
63+
func TestTCPServer(t *testing.T) {
64+
ctrl := gomock.NewController(t)
65+
defer ctrl.Finish()
66+
67+
mock := plugin_mock.NewMockPlugin(ctrl)
68+
69+
instanceID := instance.ID("id")
70+
spec := instance.Spec{
71+
Tags: map[string]string{
72+
"tag1": "value1",
73+
},
74+
Init: "init",
75+
}
76+
77+
properties := types.AnyString(`{"foo":"bar"}`)
78+
validateErr := errors.New("validate-error")
79+
80+
gomock.InOrder(
81+
mock.EXPECT().Validate(properties).Return(validateErr),
82+
mock.EXPECT().Provision(spec).Return(&instanceID, nil),
83+
)
84+
85+
service := plugin_rpc.PluginServer(mock)
86+
87+
discover := filepath.Join(os.TempDir(), fmt.Sprintf("%d.listen", time.Now().Unix()))
88+
name := plugin.Name(filepath.Base(discover))
89+
server, err := StartListenerAtPath("localhost:7777", discover, service)
90+
require.NoError(t, err)
91+
92+
c, err := plugin_rpc.NewClient(name, discover)
93+
require.NoError(t, err)
94+
95+
err = c.Validate(properties)
96+
require.Error(t, err)
97+
require.Equal(t, validateErr.Error(), err.Error())
98+
99+
id, err := c.Provision(spec)
100+
require.NoError(t, err)
101+
require.Equal(t, instanceID, *id)
102+
103+
server.Stop()
104+
}

0 commit comments

Comments
 (0)