Skip to content

Commit d93a8b4

Browse files
committed
Config proxy initiated oauthbearer auth. Implement unsecured-jwt-provider plugin
1 parent 5edf5c4 commit d93a8b4

File tree

8 files changed

+220
-19
lines changed

8 files changed

+220
-19
lines changed

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ plugin.google-id-info:
6767
plugin.unsecured-jwt-info:
6868
CGO_ENABLED=0 go build -o build/unsecured-jwt-info $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-unsecured-jwt-info/main.go
6969

70-
all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info plugin.unsecured-jwt-info
70+
plugin.unsecured-jwt-provider:
71+
CGO_ENABLED=0 go build -o build/unsecured-jwt-provider $(BUILD_FLAGS) -ldflags "$(LDFLAGS)" cmd/plugin-unsecured-jwt-provider/main.go
72+
73+
74+
all: build plugin.auth-user plugin.auth-ldap plugin.google-id-provider plugin.google-id-info plugin.unsecured-jwt-info plugin.unsecured-jwt-provider
7175

7276
clean:
7377
@rm -rf build

README.md

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ See:
3434

3535
Linux
3636

37-
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.8/kafka-proxy_0.0.8_linux_amd64.tar.gz | tar xz
37+
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_linux_amd64.tar.gz | tar xz
3838

3939
macOS
4040

41-
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.0.8/kafka-proxy_0.0.8_darwin_amd64.tar.gz | tar xz
41+
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_darwin_amd64.tar.gz | tar xz
4242

4343
2. Move the binary in to your PATH.
4444

@@ -85,7 +85,7 @@ See:
8585
--dynamic-listeners-disable Disable dynamic listeners.
8686
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started
8787
--forbidden-api-keys intSlice Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics
88-
--forward-proxy string URL of the forward proxy. Supported schemas are http and socks5
88+
--forward-proxy string URL of the forward proxy. Supported schemas are socks5 and http
8989
-h, --help help for server
9090
--http-disable Disable HTTP endpoints
9191
--http-health-path string Path on which to health endpoint (default "/health")
@@ -113,9 +113,15 @@ See:
113113
--proxy-listener-write-buffer-size int Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
114114
--proxy-request-buffer-size int Request buffer size pro tcp connection (default 4096)
115115
--proxy-response-buffer-size int Response buffer size pro tcp connection (default 4096)
116-
--sasl-enable Connect using SASL/PLAIN
116+
--sasl-enable Connect using SASL
117117
--sasl-jaas-config-file string Location of JAAS config file with SASL username and password
118118
--sasl-password string SASL user password
119+
--sasl-plugin-command string Path to authentication plugin binary
120+
--sasl-plugin-enable Use plugin for SASL authentication
121+
--sasl-plugin-log-level string Log level of the auth plugin (default "trace")
122+
--sasl-plugin-mechanism string SASL mechanism used for proxy authentication: PLAIN or OAUTHBEARER (default "OAUTHBEARER")
123+
--sasl-plugin-param stringArray Authentication plugin parameter
124+
--sasl-plugin-timeout duration Authentication timeout (default 10s)
119125
--sasl-username string SASL user name
120126
--tls-ca-chain-cert-file string PEM encoded CA's certificate file
121127
--tls-client-cert-file string PEM encoded file with client certificate
@@ -124,8 +130,6 @@ See:
124130
--tls-enable Whether or not to use TLS when connecting to the broker
125131
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name
126132
127-
128-
129133
### Usage example
130134
131135
kafka-proxy server --bootstrap-server-mapping "192.168.99.100:32400,0.0.0.0:32399"
@@ -145,14 +149,29 @@ See:
145149
--external-server-mapping "192.168.99.100:32402,127.0.0.1:32403" \
146150
--forbidden-api-keys 20
147151
152+
153+
export BOOTSTRAP_SERVER_MAPPING="192.168.99.100:32401,0.0.0.0:32402 192.168.99.100:32402,0.0.0.0:32403" && kafka-proxy server
154+
155+
### SASL authentication initiated by proxy example
156+
157+
SASL authentication is initiated by the proxy. SASL authentication is disabled on the clients and enabled on the Kafka brokers.
158+
148159
kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \
149160
--tls-enable --tls-insecure-skip-verify \
150161
--sasl-enable --sasl-username myuser --sasl-password mysecret
151162
152-
export BOOTSTRAP_SERVER_MAPPING="192.168.99.100:32401,0.0.0.0:32402 192.168.99.100:32402,0.0.0.0:32403" && kafka-proxy server
163+
make clean build plugin.unsecured-jwt-provider && build/kafka-proxy server \
164+
--sasl-enable \
165+
--sasl-plugin-enable \
166+
--sasl-plugin-mechanism "OAUTHBEARER" \
167+
--sasl-plugin-command build/unsecured-jwt-provider \
168+
--sasl-plugin-param "--claim-sub=alice" \
169+
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"
153170
154171
### Proxy authentication example
155172
173+
SASL authentication is performed by the proxy. SASL authentication is enabled on the clients and disabled on the Kafka brokers.
174+
156175
make clean build plugin.auth-user && build/kafka-proxy server --proxy-listener-key-file "server-key.pem" \
157176
--proxy-listener-cert-file "server-cert.pem" \
158177
--proxy-listener-ca-chain-cert-file "ca.pem" \

cmd/kafka-proxy/server.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,20 @@ func initFlags() {
143143
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", "", "Password to decrypt rsa private key")
144144
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")
145145

146-
// SASL
147-
Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL/PLAIN")
146+
// SASL by Proxy
147+
Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL")
148148
Server.Flags().StringVar(&c.Kafka.SASL.Username, "sasl-username", "", "SASL user name")
149149
Server.Flags().StringVar(&c.Kafka.SASL.Password, "sasl-password", "", "SASL user password")
150150
Server.Flags().StringVar(&c.Kafka.SASL.JaasConfigFile, "sasl-jaas-config-file", "", "Location of JAAS config file with SASL username and password")
151151

152+
// SASL by Proxy plugin
153+
Server.Flags().BoolVar(&c.Kafka.SASL.Plugin.Enable, "sasl-plugin-enable", false, "Use plugin for SASL authentication")
154+
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.Command, "sasl-plugin-command", "", "Path to authentication plugin binary")
155+
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.Mechanism, "sasl-plugin-mechanism", "OAUTHBEARER", "SASL mechanism used for proxy authentication: PLAIN or OAUTHBEARER")
156+
Server.Flags().StringArrayVar(&c.Kafka.SASL.Plugin.Parameters, "sasl-plugin-param", []string{}, "Authentication plugin parameter")
157+
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.LogLevel, "sasl-plugin-log-level", "trace", "Log level of the auth plugin")
158+
Server.Flags().DurationVar(&c.Kafka.SASL.Plugin.Timeout, "sasl-plugin-timeout", 10*time.Second, "Authentication timeout")
159+
152160
// Web
153161
Server.Flags().BoolVar(&c.Http.Disable, "http-disable", false, "Disable HTTP endpoints")
154162
Server.Flags().StringVar(&c.Http.ListenAddress, "http-listen-address", "0.0.0.0:9080", "Address that kafka-proxy is listening on")
@@ -235,6 +243,41 @@ func Run(_ *cobra.Command, _ []string) {
235243
}
236244
}
237245

246+
var saslTokenProvider apis.TokenProvider
247+
if c.Kafka.SASL.Plugin.Enable {
248+
switch c.Kafka.SASL.Plugin.Mechanism {
249+
case "OAUTHBEARER":
250+
var err error
251+
factory, ok := registry.GetComponent(new(apis.TokenProviderFactory), c.Kafka.SASL.Plugin.Command).(apis.TokenProviderFactory)
252+
if ok {
253+
logrus.Infof("Using built-in '%s' TokenProvider for sasl authentication", c.Kafka.SASL.Plugin.Command)
254+
255+
saslTokenProvider, err = factory.New(c.Kafka.SASL.Plugin.Parameters)
256+
if err != nil {
257+
logrus.Fatal(err)
258+
}
259+
} else {
260+
client := NewPluginClient(tokenprovider.Handshake, tokenprovider.PluginMap, c.Kafka.SASL.Plugin.LogLevel, c.Kafka.SASL.Plugin.Command, c.Kafka.SASL.Plugin.Parameters)
261+
defer client.Kill()
262+
263+
rpcClient, err := client.Client()
264+
if err != nil {
265+
logrus.Fatal(err)
266+
}
267+
raw, err := rpcClient.Dispense("tokenProvider")
268+
if err != nil {
269+
logrus.Fatal(err)
270+
}
271+
saslTokenProvider, ok = raw.(apis.TokenProvider)
272+
if !ok {
273+
logrus.Fatal(errors.New("unsupported TokenProvider plugin type"))
274+
}
275+
}
276+
default:
277+
logrus.Fatal(errors.New("unsupported sasl auth mechanism"))
278+
}
279+
}
280+
238281
var gatewayTokenProvider apis.TokenProvider
239282
if c.Auth.Gateway.Client.Enable {
240283
var err error
@@ -307,7 +350,7 @@ func Run(_ *cobra.Command, _ []string) {
307350
if err != nil {
308351
logrus.Fatal(err)
309352
}
310-
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, localPasswordAuthenticator, localTokenAuthenticator, gatewayTokenProvider, gatewayTokenInfo)
353+
proxyClient, err := proxy.NewClient(connset, c, listeners.GetNetAddressMapping, localPasswordAuthenticator, localTokenAuthenticator, saslTokenProvider, gatewayTokenProvider, gatewayTokenInfo)
311354
if err != nil {
312355
logrus.Fatal(err)
313356
}

cmd/plugin-unsecured-jwt-info/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type pluginMeta struct {
4545

4646
func (f *pluginMeta) flagSet() *flag.FlagSet {
4747
fs := flag.NewFlagSet("unsecured-jwt-info info settings", flag.ContinueOnError)
48+
fs.Var(&f.claimSub, "claim-sub", "Allowed subject claim (user name)")
4849
return fs
4950
}
5051

@@ -134,8 +135,7 @@ func getVerifyResponseResponse(status int) (apis.VerifyResponse, error) {
134135
func main() {
135136
pluginMeta := &pluginMeta{}
136137
fs := pluginMeta.flagSet()
137-
fs.Var(&pluginMeta.claimSub, "claim-sub", "Allowed subject claim (user name)")
138-
fs.Parse(os.Args[1:])
138+
_ = fs.Parse(os.Args[1:])
139139

140140
logrus.Infof("Unsecured JWT sub claims: %v", pluginMeta.claimSub)
141141

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"github.com/grepplabs/kafka-proxy/pkg/apis"
7+
"github.com/grepplabs/kafka-proxy/plugin/token-provider/shared"
8+
"github.com/hashicorp/go-plugin"
9+
"github.com/sirupsen/logrus"
10+
"golang.org/x/oauth2/jws"
11+
"os"
12+
)
13+
14+
const (
15+
StatusOK = 0
16+
StatusEncodeError = 1
17+
AlgorithmNone = "none"
18+
)
19+
20+
type UnsecuredJWTProvider struct {
21+
claimSub string
22+
}
23+
24+
func (v UnsecuredJWTProvider) GetToken(ctx context.Context, request apis.TokenRequest) (apis.TokenResponse, error) {
25+
token, err := v.encodeToken()
26+
if err != nil {
27+
return getGetTokenResponse(StatusEncodeError, "")
28+
}
29+
30+
return getGetTokenResponse(StatusOK, token)
31+
}
32+
33+
func getGetTokenResponse(status int, token string) (apis.TokenResponse, error) {
34+
success := status == StatusOK
35+
return apis.TokenResponse{Success: success, Status: int32(status), Token: token}, nil
36+
}
37+
38+
func (v UnsecuredJWTProvider) encodeToken() (string, error) {
39+
header := &jws.Header{
40+
Algorithm: AlgorithmNone,
41+
}
42+
claims := &jws.ClaimSet{
43+
Sub: v.claimSub,
44+
}
45+
signer := func(data []byte) (sig []byte, err error) {
46+
return []byte{}, nil
47+
}
48+
return jws.EncodeWithSigner(header, claims, signer)
49+
}
50+
51+
type pluginMeta struct {
52+
claimSub string
53+
}
54+
55+
func (f *pluginMeta) flagSet() *flag.FlagSet {
56+
fs := flag.NewFlagSet("unsecured-jwt-info info settings", flag.ContinueOnError)
57+
fs.StringVar(&f.claimSub, "claim-sub", "", "subject claim")
58+
return fs
59+
}
60+
61+
func main() {
62+
pluginMeta := &pluginMeta{}
63+
flags := pluginMeta.flagSet()
64+
_ = flags.Parse(os.Args[1:])
65+
66+
if pluginMeta.claimSub == "" {
67+
logrus.Errorf("parameter claim-sub is required")
68+
os.Exit(1)
69+
}
70+
71+
unsecuredJWTProvider := &UnsecuredJWTProvider{
72+
claimSub: pluginMeta.claimSub,
73+
}
74+
75+
plugin.Serve(&plugin.ServeConfig{
76+
HandshakeConfig: shared.Handshake,
77+
Plugins: map[string]plugin.Plugin{
78+
"unsecuredJWTProvider": &shared.TokenProviderPlugin{Impl: unsecuredJWTProvider},
79+
},
80+
// A non-nil value here enables gRPC serving for this plugin...
81+
GRPCServer: plugin.DefaultGRPCServer,
82+
})
83+
}

config/config.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,14 @@ type Config struct {
120120
Username string
121121
Password string
122122
JaasConfigFile string
123+
Plugin struct {
124+
Enable bool
125+
Command string
126+
Mechanism string
127+
Parameters []string
128+
LogLevel string
129+
Timeout time.Duration
130+
}
123131
}
124132
}
125133
ForwardProxy struct {
@@ -211,8 +219,26 @@ func NewConfig() *Config {
211219
}
212220

213221
func (c *Config) Validate() error {
214-
if c.Kafka.SASL.Enable && (c.Kafka.SASL.Username == "" || c.Kafka.SASL.Password == "") {
215-
return errors.New("SASL.Username and SASL.Password are required when SASL is enabled")
222+
if c.Kafka.SASL.Enable {
223+
if c.Kafka.SASL.Plugin.Enable {
224+
if c.Kafka.SASL.Plugin.Command == "" {
225+
return errors.New("Command is required when Kafka.SASL.Plugin.Enable is enabled")
226+
}
227+
if c.Kafka.SASL.Plugin.Timeout <= 0 {
228+
return errors.New("Kafka.SASL.Plugin.Timeout must be greater than 0")
229+
}
230+
if c.Kafka.SASL.Plugin.Mechanism != "OAUTHBEARER" {
231+
return errors.New("Mechanism OAUTHBEARER is required when Kafka.SASL.Plugin.Enable is enabled")
232+
}
233+
} else {
234+
if c.Kafka.SASL.Username == "" || c.Kafka.SASL.Password == "" {
235+
return errors.New("SASL.Username and SASL.Password are required when SASL is enabled and plugin is not used")
236+
}
237+
}
238+
} else {
239+
if c.Kafka.SASL.Plugin.Enable {
240+
return errors.New("Kafka.SASL.Plugin.Enable must be disabled, when SASL is disabled")
241+
}
216242
}
217243
if c.Kafka.KeepAlive < 0 {
218244
return errors.New("KeepAlive must be greater or equal 0")

proxy/client.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type Client struct {
3838
authClient *AuthClient
3939
}
4040

41-
func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) {
41+
func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, saslTokenProvider apis.TokenProvider, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) {
4242
tlsConfig, err := newTLSClientConfig(c)
4343
if err != nil {
4444
return nil, err
@@ -70,15 +70,31 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
7070
if c.Auth.Gateway.Server.Enable && gatewayTokenInfo == nil {
7171
return nil, errors.New("Auth.Gateway.Server.Enable is enabled but tokenInfo is nil")
7272
}
73+
var saslAuthByProxy SASLAuthByProxy
74+
if c.Kafka.SASL.Plugin.Enable {
75+
if c.Kafka.SASL.Plugin.Mechanism == SASLOAuthBearer && saslTokenProvider != nil {
76+
saslAuthByProxy = &SASLOAuthBearerAuth{
77+
clientID: c.Kafka.ClientID,
78+
writeTimeout: c.Kafka.WriteTimeout,
79+
readTimeout: c.Kafka.ReadTimeout,
80+
tokenProvider: saslTokenProvider,
81+
}
82+
} else {
83+
return nil, errors.Errorf("SASLAuthByProxy plugin unsupported or plugin misconfiguration for mechanism '%s' ", c.Kafka.SASL.Plugin.Mechanism)
84+
}
7385

74-
return &Client{conns: conns, config: c, dialer: dialer, tcpConnOptions: tcpConnOptions, stopRun: make(chan struct{}, 1),
75-
saslAuthByProxy: &SASLPlainAuth{
86+
} else {
87+
saslAuthByProxy = &SASLPlainAuth{
7688
clientID: c.Kafka.ClientID,
7789
writeTimeout: c.Kafka.WriteTimeout,
7890
readTimeout: c.Kafka.ReadTimeout,
7991
username: c.Kafka.SASL.Username,
8092
password: c.Kafka.SASL.Password,
81-
},
93+
}
94+
}
95+
96+
return &Client{conns: conns, config: c, dialer: dialer, tcpConnOptions: tcpConnOptions, stopRun: make(chan struct{}, 1),
97+
saslAuthByProxy: saslAuthByProxy,
8298
authClient: &AuthClient{
8399
enabled: c.Auth.Gateway.Client.Enable,
84100
magic: c.Auth.Gateway.Client.Magic,

proxy/sasl_by_proxy.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/grepplabs/kafka-proxy/pkg/apis"
99
"github.com/grepplabs/kafka-proxy/proxy/protocol"
1010
"github.com/pkg/errors"
11+
"github.com/sirupsen/logrus"
1112
"io"
1213
"time"
1314
)
@@ -77,6 +78,12 @@ func (b *SASLPlainAuth) sendAndReceiveSASLAuth(conn DeadlineReaderWriter) error
7778
if handshakeErr != nil {
7879
return handshakeErr
7980
}
81+
return b.sendSaslAuthenticateRequest(conn)
82+
}
83+
84+
func (b *SASLPlainAuth) sendSaslAuthenticateRequest(conn DeadlineReaderWriter) error {
85+
logrus.Debugf("Sending authentication opaque packets, mechanism PLAIN")
86+
8087
length := 1 + len(b.username) + 1 + len(b.password)
8188
authBytes := make([]byte, length+4) //4 byte length header + auth data
8289
binary.BigEndian.PutUint32(authBytes, uint32(length))
@@ -110,6 +117,7 @@ func (b *SASLPlainAuth) sendAndReceiveSASLAuth(conn DeadlineReaderWriter) error
110117
}
111118

112119
func (b *SASLHandshake) sendAndReceiveHandshake(conn DeadlineReaderWriter) error {
120+
logrus.Debugf("Sending SaslHandshakeRequest")
113121

114122
req := &protocol.Request{
115123
ClientID: b.clientID,
@@ -195,6 +203,8 @@ func (b *SASLOAuthBearerAuth) sendAndReceiveSASLAuth(conn DeadlineReaderWriter)
195203
}
196204

197205
func (b *SASLOAuthBearerAuth) sendSaslAuthenticateRequest(token string, conn DeadlineReaderWriter) error {
206+
logrus.Debugf("Sending SaslAuthenticateRequest, mechanism OAUTHBEARER")
207+
198208
saslAuthReqV0 := protocol.SaslAuthenticateRequestV0{SaslAuthBytes: SaslOAuthBearer{}.ToBytes(token, "", make(map[string]string, 0))}
199209

200210
req := &protocol.Request{

0 commit comments

Comments
 (0)