Skip to content

Commit 0f7e8f4

Browse files
committed
integrate cert-source lib, add listener CLR file and cert rotation
1 parent adfee6a commit 0f7e8f4

File tree

33 files changed

+1964
-400
lines changed

33 files changed

+1964
-400
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
140140
--debug-enable Enable Debug endpoint
141141
--debug-listen-address string Debug listen address (default "0.0.0.0:6060")
142142
--default-listener-ip string Default listener IP (default "0.0.0.0")
143+
--deterministic-listeners Enable deterministic listeners (listener port = min port + broker id).
143144
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
144145
--dynamic-advertised-listener string Advertised address for dynamic listeners. If empty, default-listener-ip is used
145146
--dynamic-listeners-disable Disable dynamic listeners.
@@ -178,12 +179,14 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
178179
--proxy-listener-ca-chain-cert-file string PEM encoded CA's certificate file. If provided, client certificate is required and verified
179180
--proxy-listener-cert-file string PEM encoded file with server certificate
180181
--proxy-listener-cipher-suites strings List of supported cipher suites
182+
--proxy-listener-crl-file string PEM encoded X509 CRLs file
181183
--proxy-listener-curve-preferences strings List of curve preferences
182184
--proxy-listener-keep-alive duration Keep alive period for an active network connection. If zero, keep-alives are disabled (default 1m0s)
183185
--proxy-listener-key-file string PEM encoded file with private key for the server certificate
184186
--proxy-listener-key-password string Password to decrypt rsa private key
185187
--proxy-listener-read-buffer-size int Size of the operating system's receive buffer associated with the connection. If zero, system default is used
186188
--proxy-listener-tls-enable Whether or not to use TLS listener
189+
--proxy-listener-tls-refresh duration Interval for refreshing server TLS certificates. If set to zero, the refresh watch is disabled
187190
--proxy-listener-tls-required-client-subject strings Required client certificate subject common name; example; s:/CN=[value]/C=[state]/C=[DE,PL] or r:/CN=[^val.{2}$]/C=[state]/C=[DE,PL]; check manual for more details
188191
--proxy-listener-write-buffer-size int Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
189192
--proxy-request-buffer-size int Request buffer size pro tcp connection (default 4096)
@@ -207,7 +210,9 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
207210
--tls-client-key-password string Password to decrypt rsa private key
208211
--tls-enable Whether or not to use TLS when connecting to the broker
209212
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name
213+
--tls-refresh duration Interval for refreshing client TLS certificates. If set to zero, the refresh watch is disabled
210214
--tls-same-client-cert-enable Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)
215+
--tls-system-cert-pool Use system pool for root CAs
211216
212217
### Usage example
213218

cmd/kafka-proxy/server.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,12 @@ func initFlags() {
103103
Server.Flags().DurationVar(&c.Proxy.ListenerKeepAlive, "proxy-listener-keep-alive", 60*time.Second, "Keep alive period for an active network connection. If zero, keep-alives are disabled")
104104

105105
Server.Flags().BoolVar(&c.Proxy.TLS.Enable, "proxy-listener-tls-enable", false, "Whether or not to use TLS listener")
106+
Server.Flags().DurationVar(&c.Proxy.TLS.Refresh, "proxy-listener-tls-refresh", 0*time.Second, "Interval for refreshing server TLS certificates. If set to zero, the refresh watch is disabled")
106107
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCertFile, "proxy-listener-cert-file", "", "PEM encoded file with server certificate")
107108
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyFile, "proxy-listener-key-file", "", "PEM encoded file with private key for the server certificate")
108109
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyPassword, "proxy-listener-key-password", os.Getenv("PROXY_LISTENER_KEY_PASSWORD"), "Password to decrypt rsa private key")
109-
Server.Flags().StringVar(&c.Proxy.TLS.CAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
110+
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
111+
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCRLFile, "proxy-listener-crl-file", "", "PEM encoded X509 CRLs file")
110112
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCipherSuites, "proxy-listener-cipher-suites", []string{}, "List of supported cipher suites")
111113
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCurvePreferences, "proxy-listener-curve-preferences", []string{}, "List of curve preferences")
112114

@@ -153,11 +155,13 @@ func initFlags() {
153155

154156
// TLS
155157
Server.Flags().BoolVar(&c.Kafka.TLS.Enable, "tls-enable", false, "Whether or not to use TLS when connecting to the broker")
158+
Server.Flags().DurationVar(&c.Kafka.TLS.Refresh, "tls-refresh", 0*time.Second, "Interval for refreshing client TLS certificates. If set to zero, the refresh watch is disabled")
156159
Server.Flags().BoolVar(&c.Kafka.TLS.InsecureSkipVerify, "tls-insecure-skip-verify", false, "It controls whether a client verifies the server's certificate chain and host name")
157160
Server.Flags().StringVar(&c.Kafka.TLS.ClientCertFile, "tls-client-cert-file", "", "PEM encoded file with client certificate")
158161
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyFile, "tls-client-key-file", "", "PEM encoded file with private key for the client certificate")
159162
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", os.Getenv("TLS_CLIENT_KEY_PASSWORD"), "Password to decrypt rsa private key")
160163
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")
164+
Server.Flags().BoolVar(&c.Kafka.TLS.SystemCertPool, "tls-system-cert-pool", false, "Use system pool for root CAs")
161165

162166
//Same TLS client cert tls-same-client-cert-enable
163167
Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "tls-same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)")

config/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,12 @@ type Config struct {
8888

8989
TLS struct {
9090
Enable bool
91+
Refresh time.Duration
9192
ListenerCertFile string
9293
ListenerKeyFile string
9394
ListenerKeyPassword string
94-
CAChainCertFile string
95+
ListenerCAChainCertFile string
96+
ListenerCRLFile string
9597
ListenerCipherSuites []string
9698
ListenerCurvePreferences []string
9799
ClientCert struct {
@@ -145,11 +147,13 @@ type Config struct {
145147

146148
TLS struct {
147149
Enable bool
150+
Refresh time.Duration
148151
InsecureSkipVerify bool
149152
ClientCertFile string
150153
ClientKeyFile string
151154
ClientKeyPassword string
152155
CAChainCertFile string
156+
SystemCertPool bool
153157
SameClientCertEnable bool
154158
}
155159

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/fsnotify/fsnotify v1.4.9
1313
github.com/go-ldap/ldap/v3 v3.2.3
1414
github.com/google/uuid v1.6.0
15+
github.com/grepplabs/cert-source v0.0.8
1516
github.com/hashicorp/go-hclog v1.6.3
1617
github.com/hashicorp/go-multierror v0.0.0-20171204182908-b7773ae21874
1718
github.com/hashicorp/go-plugin v1.6.3
@@ -26,7 +27,6 @@ require (
2627
github.com/spf13/viper v1.0.2
2728
github.com/stretchr/testify v1.10.0
2829
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
29-
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76
3030
golang.org/x/net v0.34.0
3131
golang.org/x/oauth2 v0.24.0
3232
google.golang.org/api v0.126.0
@@ -82,6 +82,7 @@ require (
8282
github.com/spf13/jwalterweatherman v0.0.0-20180109140146-7c0cea34c8ec // indirect
8383
github.com/spf13/pflag v1.0.5 // indirect
8484
github.com/xdg/stringprep v1.0.0 // indirect
85+
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
8586
go.opencensus.io v0.24.0 // indirect
8687
golang.org/x/crypto v0.32.0 // indirect
8788
golang.org/x/sys v0.29.0 // indirect

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ github.com/googleapis/gax-go/v2 v2.11.0 h1:9V9PWXEsWnPpQhu/PeQIkS4eGzMlTLGgt80cU
137137
github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI=
138138
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
139139
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
140+
github.com/grepplabs/cert-source v0.0.8 h1:rcZeipbbljq46mMvw9yVF4FX/1zzLVfyenV3C07XS8g=
141+
github.com/grepplabs/cert-source v0.0.8/go.mod h1:gs3IoykME1cFfZ6/h6hch8yg8ktUInsR9OY2xSHA2r4=
140142
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
141143
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce h1:prjrVgOk2Yg6w+PflHoszQNLTUh4kaByUcEWM/9uin4=
142144
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
@@ -281,8 +283,8 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV
281283
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
282284
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
283285
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
284-
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76 h1:tBiBTKHnIjovYoLX/TPkcf+OjqqKGQrPtGT3Foz+Pgo=
285-
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76/go.mod h1:SQliXeA7Dhkt//vS29v3zpbEwoa+zb2Cn5xj5uO4K5U=
286+
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
287+
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
286288
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
287289
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
288290
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=

proxy/client.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package proxy
22

33
import (
4-
"crypto/tls"
54
"crypto/x509"
65
"fmt"
76
"net"
@@ -46,20 +45,24 @@ type Client struct {
4645
}
4746

4847
func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, saslTokenProvider apis.TokenProvider, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) {
49-
tlsConfig, err := newTLSClientConfig(c)
50-
if err != nil {
51-
return nil, err
52-
}
53-
54-
var kafkaClientCert *x509.Certificate = nil
55-
if c.Kafka.TLS.SameClientCertEnable {
56-
kafkaClientCert, err = parseCertificate(c.Kafka.TLS.ClientCertFile)
48+
var (
49+
kafkaClientCert *x509.Certificate
50+
tlsConfigFunc TLSConfigFunc
51+
)
52+
if c.Kafka.TLS.Enable {
53+
var err error
54+
tlsConfigFunc, err = newTLSClientConfig(c)
5755
if err != nil {
5856
return nil, err
5957
}
58+
if c.Kafka.TLS.SameClientCertEnable {
59+
kafkaClientCert, err = parseCertificate(c.Kafka.TLS.ClientCertFile)
60+
if err != nil {
61+
return nil, err
62+
}
63+
}
6064
}
61-
62-
dialer, err := newDialer(c, tlsConfig)
65+
dialer, err := newDialer(c, tlsConfigFunc)
6366
if err != nil {
6467
return nil, err
6568
}
@@ -195,7 +198,7 @@ func getAddressToDialAddressMapping(cfg *config.Config) (map[string]config.DialA
195198
return addressToDialAddressMapping, nil
196199
}
197200

198-
func newDialer(c *config.Config, tlsConfig *tls.Config) (Dialer, error) {
201+
func newDialer(c *config.Config, tlsConfigFunc TLSConfigFunc) (Dialer, error) {
199202
directDialer := directDialer{
200203
dialTimeout: c.Kafka.DialTimeout,
201204
keepAlive: c.Kafka.KeepAlive,
@@ -230,13 +233,13 @@ func newDialer(c *config.Config, tlsConfig *tls.Config) (Dialer, error) {
230233
rawDialer = directDialer
231234
}
232235
if c.Kafka.TLS.Enable {
233-
if tlsConfig == nil {
234-
return nil, errors.New("tlsConfig must not be nil")
236+
if tlsConfigFunc == nil || tlsConfigFunc() == nil {
237+
return nil, errors.New("tlsConfigFunc must not be nil")
235238
}
236239
tlsDialer := tlsDialer{
237-
timeout: c.Kafka.DialTimeout,
238-
rawDialer: rawDialer,
239-
config: tlsConfig,
240+
timeout: c.Kafka.DialTimeout,
241+
rawDialer: rawDialer,
242+
configFunc: tlsConfigFunc,
240243
}
241244
return tlsDialer, nil
242245
}

proxy/dial.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,15 @@ func (d socks5Dialer) Dial(network, addr string) (net.Conn, error) {
6969
}
7070

7171
type tlsDialer struct {
72-
timeout time.Duration
73-
rawDialer Dialer
74-
config *tls.Config
72+
timeout time.Duration
73+
rawDialer Dialer
74+
configFunc TLSConfigFunc
7575
}
7676

7777
// see tls.DialWithDialer
7878
func (d tlsDialer) Dial(network, addr string) (net.Conn, error) {
79-
if d.config == nil {
79+
config := d.configFunc()
80+
if config == nil {
8081
return nil, errors.New("tlsConfig must not be nil")
8182
}
8283
if d.rawDialer == nil {
@@ -106,8 +107,6 @@ func (d tlsDialer) Dial(network, addr string) (net.Conn, error) {
106107
}
107108
hostname := addr[:colonPos]
108109

109-
config := d.config
110-
111110
// If no ServerName is set, infer the ServerName
112111
// from the hostname we're connecting to.
113112
if config.ServerName == "" {

0 commit comments

Comments
 (0)