Skip to content

Commit 23a619b

Browse files
committed
hashicorp/go-plugin
1 parent 514fe74 commit 23a619b

File tree

289 files changed

+177777
-14
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

289 files changed

+177777
-14
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Build
22
build/
33
/kafka-proxy
4+
testcerts/
45

56
# Intellij
67
.idea/
@@ -57,4 +58,4 @@ Session.vim
5758
.netrwhist
5859
*~
5960
# Auto-generated tag files
60-
tags
61+
tags

Gopkg.lock

Lines changed: 105 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,11 @@ build.docker-build.osx:
6868
docker rm $$buildContainer ;\
6969
docker rmi $$buildContainerName ;\
7070

71+
protoc.auth:
72+
protoc -I plugin/auth/proto/ plugin/auth/proto/auth.proto --go_out=plugins=grpc:plugin/auth/proto/
73+
74+
plugin.auth-user:
75+
CGO_ENABLED=0 go build -o build/auth-user $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-auth-user/main.go
76+
7177
clean:
72-
@rm -rf build
78+
@rm -rf build

README.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ responses received from the brokers are replaced by local counterparts.
1414
For discovered brokers (not configured as the boostrap servers), local listeners are started on random ports.
1515
The dynamic local listeners feature can be disabled and an additional list of external server mappings can be provided.
1616

17-
The Proxy can terminate TLS traffic and authenticate users locally using SASL/PLAIN. The credentials check method
18-
is configurable by providing dynamically linked shared object library.
17+
The Proxy can terminate TLS traffic and authenticate users locally using SASL/PLAIN. The credentials verification method
18+
is configurable and uses golang plugin system over RPC.
1919

2020
Kafka API calls can be restricted to prevent some operations e.g. topic deletion.
2121

@@ -40,11 +40,23 @@ See:
4040
--tls-enable --tls-insecure-skip-verify \
4141
--sasl-enable -sasl-username myuser --sasl-password mysecret
4242

43-
build/kafka-proxy server --proxy-listener-key-file "server-key.pem" \
43+
make clean build plugin.auth-user && build/kafka-proxy server \
44+
--proxy-listener-auth-enable \
45+
--proxy-listener-auth-command build/auth-user \
46+
--proxy-listener-auth-param "--username=my-test-user" \
47+
--proxy-listener-auth-param "--password=my-test-password" \
48+
--dynamic-listeners-disable \
49+
--forbidden-api-keys 20 \
50+
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32401"
51+
52+
make clean build plugin.auth-user && build/kafka-proxy server --proxy-listener-key-file "server-key.pem" \
4453
--proxy-listener-cert-file "server-cert.pem" \
4554
--proxy-listener-ca-chain-cert-file "ca.pem" \
4655
--proxy-listener-tls-enable \
4756
--proxy-listener-auth-enable \
57+
--proxy-listener-auth-command build/auth-user \
58+
--proxy-listener-auth-param "--username=my-test-user" \
59+
--proxy-listener-auth-param "--password=my-test-password" \
4860
--dynamic-listeners-disable \
4961
--forbidden-api-keys 20 \
5062
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32401" \
@@ -65,7 +77,7 @@ See:
6577
3. counter: proxy_connections_total {broker}
6678
4. counter: proxy_requests_bytes {broker}
6779
5. counter: proxy_responses_bytes {broker}
68-
* [ ] Plugable local authentication
80+
* [X] Pluggable local authentication
6981
* [ ] Deploying Kafka Proxy as a sidecar container
7082
* [ ] Performance tests and tuning
7183
* [ ] Socket buffer sizing e.g. SO_RCVBUF = 32768, SO_SNDBUF = 131072

cmd/kafka-proxy/server.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@ import (
1313
"net/http"
1414
_ "net/http/pprof"
1515
"os"
16+
"os/exec"
1617
"os/signal"
1718
"syscall"
1819
"time"
20+
21+
"errors"
22+
"github.com/grepplabs/kafka-proxy/plugin/auth/shared"
23+
"github.com/hashicorp/go-plugin"
1924
)
2025

2126
var (
@@ -69,7 +74,10 @@ func init() {
6974
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyPassword, "proxy-listener-key-password", "", "Password to decrypt rsa private key")
7075
Server.Flags().StringVar(&c.Proxy.TLS.CAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file")
7176

77+
// authentication plugin
7278
Server.Flags().BoolVar(&c.Proxy.Auth.Enable, "proxy-listener-auth-enable", false, "Enable SASL/PLAIN listener authentication")
79+
Server.Flags().StringVar(&c.Proxy.Auth.Command, "proxy-listener-auth-command", "", "Path to authentication plugin binary")
80+
Server.Flags().StringSliceVar(&c.Proxy.Auth.Parameters, "proxy-listener-auth-param", []string{}, "Authentication plugin parameter")
7381

7482
// kafka
7583
Server.Flags().StringVar(&c.Kafka.ClientID, "kafka-client-id", "kafka-proxy", "An optional identifier to track the source of requests")
@@ -116,6 +124,34 @@ func init() {
116124
func Run(_ *cobra.Command, _ []string) {
117125
logrus.Infof("Starting kafka-proxy version %s", config.Version)
118126

127+
var passwordAuthenticator shared.PasswordAuthenticator
128+
if c.Proxy.Auth.Enable {
129+
client := plugin.NewClient(&plugin.ClientConfig{
130+
HandshakeConfig: shared.Handshake,
131+
Plugins: shared.PluginMap,
132+
SyncStdout: os.Stdout,
133+
SyncStderr: os.Stderr,
134+
Cmd: exec.Command(c.Proxy.Auth.Command, c.Proxy.Auth.Parameters...),
135+
AllowedProtocols: []plugin.Protocol{
136+
plugin.ProtocolNetRPC, plugin.ProtocolGRPC},
137+
})
138+
defer client.Kill()
139+
rpcClient, err := client.Client()
140+
if err != nil {
141+
logrus.Fatal(err)
142+
}
143+
raw, err := rpcClient.Dispense("passwordAuthenticator")
144+
if err != nil {
145+
logrus.Fatal(err)
146+
}
147+
var ok bool
148+
passwordAuthenticator, ok = raw.(shared.PasswordAuthenticator)
149+
if !ok {
150+
logrus.Fatal(errors.New("unsupported plugin type"))
151+
}
152+
}
153+
_ = passwordAuthenticator
154+
119155
var g group.Group
120156
{
121157
// All active connections are stored in this variable.
@@ -129,7 +165,7 @@ func Run(_ *cobra.Command, _ []string) {
129165
if err != nil {
130166
logrus.Fatal(err)
131167
}
132-
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping)
168+
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, passwordAuthenticator)
133169
if err != nil {
134170
logrus.Fatal(err)
135171
}
@@ -179,6 +215,7 @@ func Run(_ *cobra.Command, _ []string) {
179215
debugListener.Close()
180216
})
181217
}
218+
182219
err := g.Run()
183220
logrus.Info("Exit", err)
184221
}

cmd/plugin-auth-user/main.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"github.com/grepplabs/kafka-proxy/plugin/auth/shared"
6+
"github.com/hashicorp/go-plugin"
7+
"github.com/sirupsen/logrus"
8+
"os"
9+
)
10+
11+
type PasswordAuthenticator struct {
12+
username string
13+
password string
14+
}
15+
16+
func (pa PasswordAuthenticator) Authenticate(username, password string) (bool, int32, error) {
17+
// logrus.Printf("Authenticate request for %s:%s,expected %s:%s ", username, password, pa.username, pa.password)
18+
return username == pa.username && password == pa.password, 0, nil
19+
}
20+
21+
type PluginMeta struct {
22+
flagUsername string
23+
flagPassword string
24+
}
25+
26+
func (f *PluginMeta) FlagSet() *flag.FlagSet {
27+
fs := flag.NewFlagSet("auth plugin settings", flag.ContinueOnError)
28+
fs.StringVar(&f.flagUsername, "username", "", "")
29+
fs.StringVar(&f.flagPassword, "password", "", "")
30+
return fs
31+
}
32+
33+
func main() {
34+
35+
pluginMeta := &PluginMeta{}
36+
flags := pluginMeta.FlagSet()
37+
flags.Parse(os.Args[1:])
38+
39+
if pluginMeta.flagUsername == "" || pluginMeta.flagPassword == "" {
40+
logrus.Errorf("parameters username and password are required")
41+
os.Exit(1)
42+
}
43+
44+
plugin.Serve(&plugin.ServeConfig{
45+
HandshakeConfig: shared.Handshake,
46+
Plugins: map[string]plugin.Plugin{
47+
"passwordAuthenticator": &shared.PasswordAuthenticatorPlugin{Impl: &PasswordAuthenticator{
48+
username: pluginMeta.flagUsername,
49+
password: pluginMeta.flagPassword,
50+
}},
51+
},
52+
// A non-nil value here enables gRPC serving for this plugin...
53+
GRPCServer: plugin.DefaultGRPCServer,
54+
})
55+
}

config/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ type Config struct {
5757
CAChainCertFile string
5858
}
5959
Auth struct {
60-
Enable bool
60+
Enable bool
61+
Command string
62+
Parameters []string
6163
}
6264
}
6365
Kafka struct {
@@ -205,5 +207,8 @@ func (c *Config) Validate() error {
205207
if c.Proxy.TLS.Enable && (c.Proxy.TLS.ListenerKeyFile == "" || c.Proxy.TLS.ListenerCertFile == "") {
206208
return errors.New("ListenerKeyFile and ListenerCertFile are required when Proxy TLS is enabled")
207209
}
210+
if c.Proxy.Auth.Enable && c.Proxy.Auth.Command == "" {
211+
return errors.New("Auth.Command is required when Proxy.Auth is enabled")
212+
}
208213
return nil
209214
}

0 commit comments

Comments
 (0)