Skip to content

Commit 33609c6

Browse files
committed
Implement same client cert check feature
1 parent 1072f20 commit 33609c6

File tree

10 files changed

+382
-22
lines changed

10 files changed

+382
-22
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,7 @@ Session.vim
6363
*~
6464
# Auto-generated tag files
6565
tags
66+
67+
#IntelliJ
68+
kafka-proxy.iml
69+
vendor/

Gopkg.lock

Lines changed: 43 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ See:
133133
--tls-client-key-password string Password to decrypt rsa private key
134134
--tls-enable Whether or not to use TLS when connecting to the broker
135135
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name
136+
--same-client-cert-enable Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate matches brokers client cert (tls-client-cert-file)
136137
137138
### Usage example
138139
@@ -209,6 +210,22 @@ SASL authentication is performed by the proxy. SASL authentication is enabled on
209210
--auth-local-param "--claim-sub=alice" \
210211
--auth-local-param "--claim-sub=bob" \
211212
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"
213+
214+
### Same client certificate check enabled example
215+
216+
Validate that client certificate used by proxy client is exactly the same as client certificate in authentication initiated by proxy
217+
218+
kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \
219+
--tls-enable \
220+
--tls-client-cert-file client.crt \
221+
--tls-client-key-file client.pem \
222+
--tls-client-key-password changeit \
223+
--proxy-listener-tls-enable \
224+
--proxy-listener-key-file server.pem \
225+
--proxy-listener-cert-file server.crt \
226+
--proxy-listener-key-password changeit \
227+
--proxy-listener-ca-chain-cert-file ca.crt \
228+
--same-client-cert-enable
212229
213230
### Kafka Gateway example
214231

cmd/kafka-proxy/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ func initFlags() {
148148
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", "", "Password to decrypt rsa private key")
149149
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")
150150

151+
//Same TLS client cert
152+
Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate matches brokers client cert (tls-client-cert-file)")
153+
151154
// SASL by Proxy
152155
Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL")
153156
Server.Flags().StringVar(&c.Kafka.SASL.Username, "sasl-username", "", "SASL user name")

cmd/kafka-proxy/server_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,93 @@ func TestExternalServersMappingFromEnv(t *testing.T) {
147147
a.Equal(c.Proxy.ExternalServers[1].AdvertisedAddress, "kafka-5.grepplabs.com:9092")
148148

149149
}
150+
151+
func TestSameClientCertEnabledWithRequiredFlags(t *testing.T) {
152+
153+
setupBootstrapServersMappingTest()
154+
155+
args := []string{"cobra.test",
156+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
157+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
158+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
159+
//same client enabled attributes
160+
"--same-client-cert-enable", "",
161+
"--proxy-listener-tls-enable", "",
162+
"--tls-enable", "",
163+
"--tls-client-cert-file", "client.crt",
164+
//other necessary tls arguments
165+
"--proxy-listener-key-file", "server.pem",
166+
"--proxy-listener-cert-file", "server.crt",
167+
}
168+
169+
_ = Server.ParseFlags(args)
170+
err := Server.PreRunE(nil, args)
171+
a := assert.New(t)
172+
173+
a.Nil(err)
174+
}
175+
176+
func TestSameClientCertEnabledWithMissingFlags(t *testing.T) {
177+
178+
expectedErrorMsg := "SameClientCertEnable requires TLS to be enabled on both proxy and kafka connections and client cert file on kafka connection"
179+
180+
disabledProxyTLS := []string{"cobra.test",
181+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
182+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
183+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
184+
//same client enabled attributes
185+
"--same-client-cert-enable", "",
186+
"--tls-enable", "",
187+
"--tls-client-cert-file", "client.crt",
188+
//other necessary tls arguments
189+
"--proxy-listener-key-file", "server.pem",
190+
"--proxy-listener-cert-file", "server.crt",
191+
}
192+
193+
disabledTLS := []string{"cobra.test",
194+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
195+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
196+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
197+
//same client enabled attributes
198+
"--same-client-cert-enable", "",
199+
"--proxy-listener-tls-enable", "",
200+
//other necessary tls arguments
201+
"--proxy-listener-key-file", "server.pem",
202+
"--proxy-listener-cert-file", "server.crt",
203+
}
204+
205+
missingTLSClientCert := []string{"cobra.test",
206+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
207+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
208+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
209+
//same client enabled attributes
210+
"--same-client-cert-enable", "",
211+
"--proxy-listener-tls-enable", "",
212+
"--tls-enable", "",
213+
//other necessary tls arguments
214+
"--proxy-listener-key-file", "server.pem",
215+
"--proxy-listener-cert-file", "server.crt",
216+
}
217+
218+
t.Run("DisabledProxyTLS", func(t *testing.T) {
219+
serverPreRunFailure(t, disabledProxyTLS, expectedErrorMsg)
220+
})
221+
222+
t.Run("DisabledTLS", func(t *testing.T) {
223+
serverPreRunFailure(t, disabledTLS, expectedErrorMsg)
224+
})
225+
226+
t.Run("MissingTLSClientCert", func(t *testing.T) {
227+
serverPreRunFailure(t, missingTLSClientCert, expectedErrorMsg)
228+
})
229+
}
230+
231+
func serverPreRunFailure(t *testing.T, cmdLineFlags []string, expectedErrorMsg string) {
232+
setupBootstrapServersMappingTest()
233+
234+
_ = Server.ParseFlags(cmdLineFlags)
235+
err := Server.PreRunE(nil, cmdLineFlags)
236+
a := assert.New(t)
237+
238+
a.Equal(err.Error(), expectedErrorMsg)
239+
}

config/config.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,13 @@ type Config struct {
113113
ConnectionWriteBufferSize int // SO_SNDBUF
114114

115115
TLS struct {
116-
Enable bool
117-
InsecureSkipVerify bool
118-
ClientCertFile string
119-
ClientKeyFile string
120-
ClientKeyPassword string
121-
CAChainCertFile string
116+
Enable bool
117+
InsecureSkipVerify bool
118+
ClientCertFile string
119+
ClientKeyFile string
120+
ClientKeyPassword string
121+
CAChainCertFile string
122+
SameClientCertEnable bool
122123
}
123124

124125
SASL struct {
@@ -315,6 +316,9 @@ func (c *Config) Validate() error {
315316
if c.Proxy.TLS.Enable && (c.Proxy.TLS.ListenerKeyFile == "" || c.Proxy.TLS.ListenerCertFile == "") {
316317
return errors.New("ListenerKeyFile and ListenerCertFile are required when Proxy TLS is enabled")
317318
}
319+
if c.Kafka.TLS.SameClientCertEnable && (!c.Kafka.TLS.Enable || c.Kafka.TLS.ClientCertFile == "" || !c.Proxy.TLS.Enable) {
320+
return errors.New("ClientCertFile is required on Kafka TLS and TLS must be enabled on both Proxy and Kafka connections when SameClientCertEnable is enabled")
321+
}
318322
if c.Auth.Local.Enable && c.Auth.Local.Command == "" {
319323
return errors.New("Command is required when Auth.Local.Enable is enabled")
320324
}

proxy/client.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package proxy
22

33
import (
44
"crypto/tls"
5+
"crypto/x509"
56
"fmt"
67
"github.com/grepplabs/kafka-proxy/config"
78
"github.com/grepplabs/kafka-proxy/pkg/apis"
@@ -39,13 +40,24 @@ type Client struct {
3940
authClient *AuthClient
4041

4142
dialAddressMapping map[string]config.DialAddressMapping
43+
44+
kafkaClientCert *x509.Certificate
4245
}
4346

4447
func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, saslTokenProvider apis.TokenProvider, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) {
4548
tlsConfig, err := newTLSClientConfig(c)
4649
if err != nil {
4750
return nil, err
4851
}
52+
53+
var kafkaClientCert *x509.Certificate = nil
54+
if c.Kafka.TLS.SameClientCertEnable {
55+
kafkaClientCert, err = parseCertificate(c.Kafka.TLS.ClientCertFile)
56+
if err != nil {
57+
return nil, err
58+
}
59+
}
60+
4961
dialer, err := newDialer(c, tlsConfig)
5062
if err != nil {
5163
return nil, err
@@ -145,6 +157,7 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
145157
ForbiddenApiKeys: forbiddenApiKeys,
146158
},
147159
dialAddressMapping: dialAddressMapping,
160+
kafkaClientCert: kafkaClientCert,
148161
}, nil
149162
}
150163

@@ -242,6 +255,17 @@ func (c *Client) Close() {
242255
}
243256

244257
func (c *Client) handleConn(conn Conn) {
258+
localConn := conn.LocalConnection
259+
if c.kafkaClientCert != nil {
260+
err := handshakeAsTLSAndValidateClientCert(localConn, c.kafkaClientCert, c.config.Kafka.DialTimeout)
261+
262+
if err != nil {
263+
logrus.Info(err.Error())
264+
_ = localConn.Close()
265+
return
266+
}
267+
}
268+
245269
proxyConnectionsTotal.WithLabelValues(conn.BrokerAddress).Inc()
246270

247271
dialAddress := conn.BrokerAddress

proxy/tls.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"github.com/klauspost/cpuid"
99
"github.com/pkg/errors"
1010
"io/ioutil"
11+
"net"
1112
"strings"
13+
"time"
1214
)
1315

1416
var (
@@ -58,6 +60,8 @@ var (
5860
"ECDHE-RSA-3DES-EDE-CBC-SHA": tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
5961
"RSA-3DES-EDE-CBC-SHA": tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
6062
}
63+
64+
zeroTime = time.Time{}
6165
)
6266

6367
func newTLSListenerConfig(conf *config.Config) (*tls.Config, error) {
@@ -225,3 +229,79 @@ func decryptPEM(pemData []byte, password string) ([]byte, error) {
225229
}
226230
return pemData, nil
227231
}
232+
233+
func parseCertificate(certFile string) (*x509.Certificate, error) {
234+
235+
content, readErr := ioutil.ReadFile(certFile)
236+
237+
if readErr != nil {
238+
return nil, errors.Errorf("Failed to read file from location '%s'", certFile)
239+
}
240+
241+
block, _ := pem.Decode(content)
242+
243+
cert, parseErr := x509.ParseCertificate(block.Bytes)
244+
245+
if parseErr != nil {
246+
return nil, errors.Errorf("Failed to parse certificate file from location '%s'", certFile)
247+
}
248+
249+
return cert, nil
250+
}
251+
252+
func handshakeAsTLSAndValidateClientCert(conn net.Conn, expectedCert *x509.Certificate, handshakeTimeout time.Duration) error {
253+
tlsConn, ok := conn.(*tls.Conn)
254+
if !ok {
255+
return errors.New("Unable to cast connection to TLS when validating client cert")
256+
}
257+
258+
err := handshakeTLSConn(tlsConn, handshakeTimeout)
259+
if err != nil {
260+
return err
261+
}
262+
263+
actualClientCert := filterClientCertificate(tlsConn.ConnectionState().PeerCertificates)
264+
265+
result := validateClientCert(actualClientCert, expectedCert)
266+
267+
return result
268+
}
269+
270+
func handshakeTLSConn(tlsConn *tls.Conn, timeout time.Duration) error {
271+
err := tlsConn.SetDeadline(time.Now().Add(timeout))
272+
if err != nil {
273+
return errors.Errorf("Failed to set deadline with handshake timeout in seconds %f on connection: %v", timeout.Seconds(), err)
274+
}
275+
276+
err = tlsConn.Handshake()
277+
if err != nil {
278+
return errors.Errorf("TLS handshake failed when exchanging client certificates: %v", err)
279+
}
280+
281+
err = tlsConn.SetDeadline(zeroTime)
282+
if err != nil {
283+
return errors.Errorf("Failed to reset deadline on connection: %v", err)
284+
}
285+
286+
return err
287+
}
288+
289+
func filterClientCertificate(peerCertificates []*x509.Certificate) *x509.Certificate {
290+
for _, v := range peerCertificates {
291+
if !v.IsCA {
292+
return v
293+
}
294+
}
295+
return nil
296+
}
297+
298+
func validateClientCert(actualClientCert *x509.Certificate, expectedCert *x509.Certificate) error {
299+
if actualClientCert == nil {
300+
return errors.New("Client cert not found in TLS connection")
301+
}
302+
303+
if !actualClientCert.Equal(expectedCert) {
304+
return errors.New("Client cert sent by proxy client does not match brokers client cert (tls-client-cert-file)")
305+
}
306+
return nil
307+
}

0 commit comments

Comments
 (0)