Skip to content

Commit 0f429f5

Browse files
authored
Merge pull request #42 from mgusiew-guide/SameClientCertEnable
Implement same client cert check feature
2 parents 6f5c93c + 33609c6 commit 0f429f5

File tree

10 files changed

+382
-22
lines changed

10 files changed

+382
-22
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,7 @@ Session.vim
6363
*~
6464
# Auto-generated tag files
6565
tags
66+
67+
#IntelliJ
68+
kafka-proxy.iml
69+
vendor/

Gopkg.lock

Lines changed: 43 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ See:
137137
--tls-client-key-password string Password to decrypt rsa private key
138138
--tls-enable Whether or not to use TLS when connecting to the broker
139139
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name
140+
--same-client-cert-enable Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate matches brokers client cert (tls-client-cert-file)
140141
141142
### Usage example
142143
@@ -213,6 +214,22 @@ SASL authentication is performed by the proxy. SASL authentication is enabled on
213214
--auth-local-param "--claim-sub=alice" \
214215
--auth-local-param "--claim-sub=bob" \
215216
--bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"
217+
218+
### Same client certificate check enabled example
219+
220+
Validate that client certificate used by proxy client is exactly the same as client certificate in authentication initiated by proxy
221+
222+
kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \
223+
--tls-enable \
224+
--tls-client-cert-file client.crt \
225+
--tls-client-key-file client.pem \
226+
--tls-client-key-password changeit \
227+
--proxy-listener-tls-enable \
228+
--proxy-listener-key-file server.pem \
229+
--proxy-listener-cert-file server.crt \
230+
--proxy-listener-key-password changeit \
231+
--proxy-listener-ca-chain-cert-file ca.crt \
232+
--same-client-cert-enable
216233
217234
### Kafka Gateway example
218235

cmd/kafka-proxy/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ func initFlags() {
149149
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", "", "Password to decrypt rsa private key")
150150
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")
151151

152+
//Same TLS client cert
153+
Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate matches brokers client cert (tls-client-cert-file)")
154+
152155
// SASL by Proxy
153156
Server.Flags().BoolVar(&c.Kafka.SASL.Enable, "sasl-enable", false, "Connect using SASL")
154157
Server.Flags().StringVar(&c.Kafka.SASL.Username, "sasl-username", "", "SASL user name")

cmd/kafka-proxy/server_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,93 @@ func TestExternalServersMappingFromEnv(t *testing.T) {
147147
a.Equal(c.Proxy.ExternalServers[1].AdvertisedAddress, "kafka-5.grepplabs.com:9092")
148148

149149
}
150+
151+
func TestSameClientCertEnabledWithRequiredFlags(t *testing.T) {
152+
153+
setupBootstrapServersMappingTest()
154+
155+
args := []string{"cobra.test",
156+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
157+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
158+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
159+
//same client enabled attributes
160+
"--same-client-cert-enable", "",
161+
"--proxy-listener-tls-enable", "",
162+
"--tls-enable", "",
163+
"--tls-client-cert-file", "client.crt",
164+
//other necessary tls arguments
165+
"--proxy-listener-key-file", "server.pem",
166+
"--proxy-listener-cert-file", "server.crt",
167+
}
168+
169+
_ = Server.ParseFlags(args)
170+
err := Server.PreRunE(nil, args)
171+
a := assert.New(t)
172+
173+
a.Nil(err)
174+
}
175+
176+
func TestSameClientCertEnabledWithMissingFlags(t *testing.T) {
177+
178+
expectedErrorMsg := "SameClientCertEnable requires TLS to be enabled on both proxy and kafka connections and client cert file on kafka connection"
179+
180+
disabledProxyTLS := []string{"cobra.test",
181+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
182+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
183+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
184+
//same client enabled attributes
185+
"--same-client-cert-enable", "",
186+
"--tls-enable", "",
187+
"--tls-client-cert-file", "client.crt",
188+
//other necessary tls arguments
189+
"--proxy-listener-key-file", "server.pem",
190+
"--proxy-listener-cert-file", "server.crt",
191+
}
192+
193+
disabledTLS := []string{"cobra.test",
194+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
195+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
196+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
197+
//same client enabled attributes
198+
"--same-client-cert-enable", "",
199+
"--proxy-listener-tls-enable", "",
200+
//other necessary tls arguments
201+
"--proxy-listener-key-file", "server.pem",
202+
"--proxy-listener-cert-file", "server.crt",
203+
}
204+
205+
missingTLSClientCert := []string{"cobra.test",
206+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
207+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
208+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
209+
//same client enabled attributes
210+
"--same-client-cert-enable", "",
211+
"--proxy-listener-tls-enable", "",
212+
"--tls-enable", "",
213+
//other necessary tls arguments
214+
"--proxy-listener-key-file", "server.pem",
215+
"--proxy-listener-cert-file", "server.crt",
216+
}
217+
218+
t.Run("DisabledProxyTLS", func(t *testing.T) {
219+
serverPreRunFailure(t, disabledProxyTLS, expectedErrorMsg)
220+
})
221+
222+
t.Run("DisabledTLS", func(t *testing.T) {
223+
serverPreRunFailure(t, disabledTLS, expectedErrorMsg)
224+
})
225+
226+
t.Run("MissingTLSClientCert", func(t *testing.T) {
227+
serverPreRunFailure(t, missingTLSClientCert, expectedErrorMsg)
228+
})
229+
}
230+
231+
func serverPreRunFailure(t *testing.T, cmdLineFlags []string, expectedErrorMsg string) {
232+
setupBootstrapServersMappingTest()
233+
234+
_ = Server.ParseFlags(cmdLineFlags)
235+
err := Server.PreRunE(nil, cmdLineFlags)
236+
a := assert.New(t)
237+
238+
a.Equal(err.Error(), expectedErrorMsg)
239+
}

config/config.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,13 @@ type Config struct {
116116
ConnectionWriteBufferSize int // SO_SNDBUF
117117

118118
TLS struct {
119-
Enable bool
120-
InsecureSkipVerify bool
121-
ClientCertFile string
122-
ClientKeyFile string
123-
ClientKeyPassword string
124-
CAChainCertFile string
119+
Enable bool
120+
InsecureSkipVerify bool
121+
ClientCertFile string
122+
ClientKeyFile string
123+
ClientKeyPassword string
124+
CAChainCertFile string
125+
SameClientCertEnable bool
125126
}
126127

127128
SASL struct {
@@ -318,6 +319,9 @@ func (c *Config) Validate() error {
318319
if c.Proxy.TLS.Enable && (c.Proxy.TLS.ListenerKeyFile == "" || c.Proxy.TLS.ListenerCertFile == "") {
319320
return errors.New("ListenerKeyFile and ListenerCertFile are required when Proxy TLS is enabled")
320321
}
322+
if c.Kafka.TLS.SameClientCertEnable && (!c.Kafka.TLS.Enable || c.Kafka.TLS.ClientCertFile == "" || !c.Proxy.TLS.Enable) {
323+
return errors.New("ClientCertFile is required on Kafka TLS and TLS must be enabled on both Proxy and Kafka connections when SameClientCertEnable is enabled")
324+
}
321325
if c.Auth.Local.Enable && c.Auth.Local.Command == "" {
322326
return errors.New("Command is required when Auth.Local.Enable is enabled")
323327
}

proxy/client.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package proxy
22

33
import (
44
"crypto/tls"
5+
"crypto/x509"
56
"fmt"
67
"github.com/grepplabs/kafka-proxy/config"
78
"github.com/grepplabs/kafka-proxy/pkg/apis"
@@ -39,13 +40,24 @@ type Client struct {
3940
authClient *AuthClient
4041

4142
dialAddressMapping map[string]config.DialAddressMapping
43+
44+
kafkaClientCert *x509.Certificate
4245
}
4346

4447
func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, saslTokenProvider apis.TokenProvider, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) {
4548
tlsConfig, err := newTLSClientConfig(c)
4649
if err != nil {
4750
return nil, err
4851
}
52+
53+
var kafkaClientCert *x509.Certificate = nil
54+
if c.Kafka.TLS.SameClientCertEnable {
55+
kafkaClientCert, err = parseCertificate(c.Kafka.TLS.ClientCertFile)
56+
if err != nil {
57+
return nil, err
58+
}
59+
}
60+
4961
dialer, err := newDialer(c, tlsConfig)
5062
if err != nil {
5163
return nil, err
@@ -145,6 +157,7 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
145157
ForbiddenApiKeys: forbiddenApiKeys,
146158
},
147159
dialAddressMapping: dialAddressMapping,
160+
kafkaClientCert: kafkaClientCert,
148161
}, nil
149162
}
150163

@@ -242,6 +255,17 @@ func (c *Client) Close() {
242255
}
243256

244257
func (c *Client) handleConn(conn Conn) {
258+
localConn := conn.LocalConnection
259+
if c.kafkaClientCert != nil {
260+
err := handshakeAsTLSAndValidateClientCert(localConn, c.kafkaClientCert, c.config.Kafka.DialTimeout)
261+
262+
if err != nil {
263+
logrus.Info(err.Error())
264+
_ = localConn.Close()
265+
return
266+
}
267+
}
268+
245269
proxyConnectionsTotal.WithLabelValues(conn.BrokerAddress).Inc()
246270

247271
dialAddress := conn.BrokerAddress

proxy/tls.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"github.com/klauspost/cpuid"
99
"github.com/pkg/errors"
1010
"io/ioutil"
11+
"net"
1112
"strings"
13+
"time"
1214
)
1315

1416
var (
@@ -58,6 +60,8 @@ var (
5860
"ECDHE-RSA-3DES-EDE-CBC-SHA": tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
5961
"RSA-3DES-EDE-CBC-SHA": tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
6062
}
63+
64+
zeroTime = time.Time{}
6165
)
6266

6367
func newTLSListenerConfig(conf *config.Config) (*tls.Config, error) {
@@ -224,3 +228,79 @@ func decryptPEM(pemData []byte, password string) ([]byte, error) {
224228
}
225229
return pemData, nil
226230
}
231+
232+
func parseCertificate(certFile string) (*x509.Certificate, error) {
233+
234+
content, readErr := ioutil.ReadFile(certFile)
235+
236+
if readErr != nil {
237+
return nil, errors.Errorf("Failed to read file from location '%s'", certFile)
238+
}
239+
240+
block, _ := pem.Decode(content)
241+
242+
cert, parseErr := x509.ParseCertificate(block.Bytes)
243+
244+
if parseErr != nil {
245+
return nil, errors.Errorf("Failed to parse certificate file from location '%s'", certFile)
246+
}
247+
248+
return cert, nil
249+
}
250+
251+
func handshakeAsTLSAndValidateClientCert(conn net.Conn, expectedCert *x509.Certificate, handshakeTimeout time.Duration) error {
252+
tlsConn, ok := conn.(*tls.Conn)
253+
if !ok {
254+
return errors.New("Unable to cast connection to TLS when validating client cert")
255+
}
256+
257+
err := handshakeTLSConn(tlsConn, handshakeTimeout)
258+
if err != nil {
259+
return err
260+
}
261+
262+
actualClientCert := filterClientCertificate(tlsConn.ConnectionState().PeerCertificates)
263+
264+
result := validateClientCert(actualClientCert, expectedCert)
265+
266+
return result
267+
}
268+
269+
func handshakeTLSConn(tlsConn *tls.Conn, timeout time.Duration) error {
270+
err := tlsConn.SetDeadline(time.Now().Add(timeout))
271+
if err != nil {
272+
return errors.Errorf("Failed to set deadline with handshake timeout in seconds %f on connection: %v", timeout.Seconds(), err)
273+
}
274+
275+
err = tlsConn.Handshake()
276+
if err != nil {
277+
return errors.Errorf("TLS handshake failed when exchanging client certificates: %v", err)
278+
}
279+
280+
err = tlsConn.SetDeadline(zeroTime)
281+
if err != nil {
282+
return errors.Errorf("Failed to reset deadline on connection: %v", err)
283+
}
284+
285+
return err
286+
}
287+
288+
func filterClientCertificate(peerCertificates []*x509.Certificate) *x509.Certificate {
289+
for _, v := range peerCertificates {
290+
if !v.IsCA {
291+
return v
292+
}
293+
}
294+
return nil
295+
}
296+
297+
func validateClientCert(actualClientCert *x509.Certificate, expectedCert *x509.Certificate) error {
298+
if actualClientCert == nil {
299+
return errors.New("Client cert not found in TLS connection")
300+
}
301+
302+
if !actualClientCert.Equal(expectedCert) {
303+
return errors.New("Client cert sent by proxy client does not match brokers client cert (tls-client-cert-file)")
304+
}
305+
return nil
306+
}

0 commit comments

Comments
 (0)