Skip to content

Commit 9d2cd3c

Browse files
committed
Connect to Kafka through SOCKS5 Proxy
1 parent 701a917 commit 9d2cd3c

File tree

10 files changed

+707
-22
lines changed

10 files changed

+707
-22
lines changed

Gopkg.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ See:
121121
--sasl-jaas-config-file string Location of JAAS config file with SASL username and password
122122
--sasl-password string SASL user password
123123
--sasl-username string SASL user name
124+
--socks5-address string Address of SOCKS5 proxy to connect through when connecting to kafka brokers
125+
--socks5-password string Password for SOCKS5 proxy Username/Password Authentication
126+
--socks5-username string Username for SOCKS5 proxy Username/Password Authentication
124127
--tls-ca-chain-cert-file string PEM encoded CA's certificate file
125128
--tls-client-cert-file string PEM encoded file with client certificate
126129
--tls-client-key-file string PEM encoded file with private key for the client certificate
@@ -323,7 +326,8 @@ spec:
323326
* [X] Set TLS server CipherSuites and CurvePreferences
324327
* [ ] Optional ApiVersionsRequest before Local SASL Authentication Sequence
325328
* [ ] SaslHandshakeRequest v1 - Kafka 1.1.0
326-
* [ ] Socks5 proxy and http proxy (googleid) for outgoing connections
329+
* [X] Connect to Kafka through SOCKS5 Proxy
330+
* [ ] Proxy support for outgoing HTTP/HTTPS connections (googleid)
327331
* [ ] Performance tests and tuning
328332
* [ ] Socket buffer sizing e.g. SO_RCVBUF = 32768, SO_SNDBUF = 131072
329333
* [ ] Kafka connect tests

cmd/kafka-proxy/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ func init() {
148148
// Logging
149149
Server.Flags().StringVar(&c.Log.Format, "log-format", "text", "Log format text or json")
150150
Server.Flags().StringVar(&c.Log.Level, "log-level", "info", "Log level debug, info, warning, error, fatal or panic")
151+
152+
// Socks5 to Kafka
153+
Server.Flags().StringVar(&c.Socks5.ProxyAddress, "socks5-address", "", "Address of SOCKS5 proxy to connect through when connecting to kafka brokers")
154+
Server.Flags().StringVar(&c.Socks5.Username, "socks5-username", "", "Username for SOCKS5 proxy Username/Password Authentication")
155+
Server.Flags().StringVar(&c.Socks5.Password, "socks5-password", "", "Password for SOCKS5 proxy Username/Password Authentication")
151156
}
152157

153158
func Run(_ *cobra.Command, _ []string) {

config/config.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ type Config struct {
120120
JaasConfigFile string
121121
}
122122
}
123+
Socks5 struct {
124+
ProxyAddress string
125+
Username string
126+
Password string
127+
}
123128
}
124129

125130
func (c *Config) InitBootstrapServers(bootstrapServersMapping []string) (err error) {
@@ -261,5 +266,21 @@ func (c *Config) Validate() error {
261266
if c.Auth.Gateway.Server.Enable && c.Auth.Gateway.Server.Timeout <= 0 {
262267
return errors.New("Auth.Gateway.Server.Timeout must be greater than 0")
263268
}
269+
if c.Socks5.ProxyAddress == "" && (c.Socks5.Username != "" || c.Socks5.Password != "") {
270+
return errors.New("Socks5.ProxyAddress must not be empty when Socks5 Username/Password is provided")
271+
}
272+
if (c.Socks5.Username != "" && c.Socks5.Password == "") || (c.Socks5.Username == "" && c.Socks5.Password != "") {
273+
return errors.New("Both Socks5 Username and Password must be provided provided")
274+
}
275+
if len(c.Socks5.Username) > 255 || len(c.Socks5.Password) > 255 {
276+
// RFC1929
277+
return errors.New("Max length of Socks5 Username/Password is 255 chars")
278+
}
279+
if c.Socks5.ProxyAddress != "" {
280+
if _, _, err := util.SplitHostPort(c.Socks5.ProxyAddress); err != nil {
281+
return err
282+
}
283+
}
284+
264285
return nil
265286
}

proxy/client.go

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Client struct {
2828
// Config of Proxy request-response processor (instance p)
2929
processorConfig ProcessorConfig
3030

31-
tlsConfig *tls.Config
31+
dialer Dialer
3232
tcpConnOptions TCPConnOptions
3333

3434
stopRun chan struct{}
@@ -43,6 +43,10 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
4343
if err != nil {
4444
return nil, err
4545
}
46+
dialer, err := newDialer(c, tlsConfig)
47+
if err != nil {
48+
return nil, err
49+
}
4650
tcpConnOptions := TCPConnOptions{
4751
KeepAlive: c.Kafka.KeepAlive,
4852
WriteBufferSize: c.Kafka.ConnectionWriteBufferSize,
@@ -67,7 +71,7 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
6771
return nil, errors.New("Auth.Gateway.Server.Enable is enabled but tokenInfo is nil")
6872
}
6973

70-
return &Client{conns: conns, config: c, tlsConfig: tlsConfig, tcpConnOptions: tcpConnOptions, stopRun: make(chan struct{}, 1),
74+
return &Client{conns: conns, config: c, dialer: dialer, tcpConnOptions: tcpConnOptions, stopRun: make(chan struct{}, 1),
7175
saslPlainAuth: &SASLPlainAuth{
7276
clientID: c.Kafka.ClientID,
7377
writeTimeout: c.Kafka.WriteTimeout,
@@ -104,6 +108,40 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
104108
}}, nil
105109
}
106110

111+
func newDialer(c *config.Config, tlsConfig *tls.Config) (Dialer, error) {
112+
var rawDialer Dialer
113+
if c.Socks5.ProxyAddress != "" {
114+
logrus.Infof("Kafka clients will connect through the SOCKS5 proxy %s", c.Socks5.ProxyAddress)
115+
rawDialer = &socks5Dialer{
116+
directDialer: directDialer{
117+
dialTimeout: c.Kafka.DialTimeout,
118+
keepAlive: c.Kafka.KeepAlive,
119+
},
120+
proxyNetwork: "tcp",
121+
proxyAddr: c.Socks5.ProxyAddress,
122+
username: c.Socks5.Username,
123+
password: c.Socks5.Password,
124+
}
125+
} else {
126+
rawDialer = directDialer{
127+
dialTimeout: c.Kafka.DialTimeout,
128+
keepAlive: c.Kafka.KeepAlive,
129+
}
130+
}
131+
if c.Kafka.TLS.Enable {
132+
if tlsConfig == nil {
133+
return nil, errors.New("tlsConfig must not be nil")
134+
}
135+
tlsDialer := tlsDialer{
136+
timeout: c.Kafka.DialTimeout,
137+
rawDialer: rawDialer,
138+
config: tlsConfig,
139+
}
140+
return tlsDialer, nil
141+
}
142+
return rawDialer, nil
143+
}
144+
107145
// Run causes the client to start waiting for new connections to connSrc and
108146
// proxy them to the destination instance. It blocks until connSrc is closed.
109147
func (c *Client) Run(connSrc <-chan Conn) error {
@@ -136,7 +174,7 @@ func (c *Client) Close() {
136174
func (c *Client) handleConn(conn Conn) {
137175
proxyConnectionsTotal.WithLabelValues(conn.BrokerAddress).Inc()
138176

139-
server, err := c.DialAndAuth(conn.BrokerAddress, c.tlsConfig)
177+
server, err := c.DialAndAuth(conn.BrokerAddress)
140178
if err != nil {
141179
logrus.Infof("couldn't connect to %s: %v", conn.BrokerAddress, err)
142180
conn.LocalConnection.Close()
@@ -148,15 +186,15 @@ func (c *Client) handleConn(conn Conn) {
148186
}
149187
}
150188
c.conns.Add(conn.BrokerAddress, conn.LocalConnection)
151-
localDesc := "local connection on " + conn.LocalConnection.LocalAddr().String() + " from " + conn.LocalConnection.RemoteAddr().String()
189+
localDesc := "local connection on " + conn.LocalConnection.LocalAddr().String() + " from " + conn.LocalConnection.RemoteAddr().String() + " (" + conn.BrokerAddress + ")"
152190
copyThenClose(c.processorConfig, server, conn.LocalConnection, conn.BrokerAddress, conn.BrokerAddress, localDesc)
153191
if err := c.conns.Remove(conn.BrokerAddress, conn.LocalConnection); err != nil {
154192
logrus.Info(err)
155193
}
156194
}
157195

158-
func (c *Client) DialAndAuth(brokerAddress string, tlsConfig *tls.Config) (net.Conn, error) {
159-
conn, err := c.dial(brokerAddress, tlsConfig)
196+
func (c *Client) DialAndAuth(brokerAddress string) (net.Conn, error) {
197+
conn, err := c.dialer.Dial("tcp", brokerAddress)
160198
if err != nil {
161199
return nil, err
162200
}
@@ -167,20 +205,6 @@ func (c *Client) DialAndAuth(brokerAddress string, tlsConfig *tls.Config) (net.C
167205
return conn, nil
168206
}
169207

170-
func (c *Client) dial(brokerAddress string, tlsConfig *tls.Config) (net.Conn, error) {
171-
dialer := net.Dialer{
172-
Timeout: c.config.Kafka.DialTimeout,
173-
KeepAlive: c.config.Kafka.KeepAlive,
174-
}
175-
if c.config.Kafka.TLS.Enable {
176-
if tlsConfig == nil {
177-
return nil, errors.New("tlsConfig must not be nil")
178-
}
179-
return tls.DialWithDialer(&dialer, "tcp", brokerAddress, tlsConfig)
180-
}
181-
return dialer.Dial("tcp", brokerAddress)
182-
}
183-
184208
func (c *Client) auth(conn net.Conn) error {
185209
if c.config.Auth.Gateway.Client.Enable {
186210
if err := c.authClient.sendAndReceiveGatewayAuth(conn); err != nil {

proxy/dial.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package proxy
2+
3+
import (
4+
"crypto/tls"
5+
"github.com/pkg/errors"
6+
"golang.org/x/net/proxy"
7+
"net"
8+
"strings"
9+
"time"
10+
)
11+
12+
type Dialer interface {
13+
Dial(network, addr string) (c net.Conn, err error)
14+
}
15+
16+
type directDialer struct {
17+
dialTimeout time.Duration
18+
keepAlive time.Duration
19+
}
20+
21+
func (d directDialer) Dial(network, addr string) (net.Conn, error) {
22+
dialer := net.Dialer{
23+
Timeout: d.dialTimeout,
24+
KeepAlive: d.keepAlive,
25+
}
26+
return dialer.Dial(network, addr)
27+
}
28+
29+
type socks5Dialer struct {
30+
directDialer directDialer
31+
proxyNetwork, proxyAddr string
32+
username, password string
33+
}
34+
35+
func (d socks5Dialer) Dial(network, addr string) (net.Conn, error) {
36+
if d.proxyNetwork == "" || d.proxyAddr == "" {
37+
return nil, errors.New("socks5 proxy network and addr must be not empty")
38+
}
39+
var auth *proxy.Auth
40+
if d.username != "" && d.password != "" {
41+
auth = &proxy.Auth{
42+
User: d.username,
43+
Password: d.password,
44+
}
45+
}
46+
socks5Dialer, err := proxy.SOCKS5(d.proxyNetwork, d.proxyAddr, auth, d.directDialer)
47+
if err != nil {
48+
return nil, err
49+
}
50+
conn, err := socks5Dialer.Dial(network, addr)
51+
if err != nil {
52+
return nil, err
53+
}
54+
return conn, nil
55+
}
56+
57+
type tlsDialer struct {
58+
timeout time.Duration
59+
rawDialer Dialer
60+
config *tls.Config
61+
}
62+
63+
// see tls.DialWithDialer
64+
func (d tlsDialer) Dial(network, addr string) (net.Conn, error) {
65+
if d.config == nil {
66+
return nil, errors.New("tlsConfig must not be nil")
67+
}
68+
if d.rawDialer == nil {
69+
return nil, errors.New("rawDialer must not be nil")
70+
}
71+
72+
timeout := d.timeout
73+
74+
var errChannel chan error
75+
76+
if timeout != 0 {
77+
errChannel = make(chan error, 2)
78+
timer := time.AfterFunc(timeout, func() {
79+
errChannel <- errors.Errorf("Handshake timeout to %s after %v", addr, timeout)
80+
})
81+
defer timer.Stop()
82+
}
83+
84+
rawConn, err := d.rawDialer.Dial(network, addr)
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
colonPos := strings.LastIndex(addr, ":")
90+
if colonPos == -1 {
91+
colonPos = len(addr)
92+
}
93+
hostname := addr[:colonPos]
94+
95+
config := d.config
96+
97+
// If no ServerName is set, infer the ServerName
98+
// from the hostname we're connecting to.
99+
if config.ServerName == "" {
100+
// Make a copy to avoid polluting argument or default.
101+
c := config.Clone()
102+
c.ServerName = hostname
103+
config = c
104+
}
105+
106+
conn := tls.Client(rawConn, config)
107+
108+
if timeout == 0 {
109+
err = conn.Handshake()
110+
} else {
111+
go func() {
112+
errChannel <- conn.Handshake()
113+
}()
114+
115+
err = <-errChannel
116+
}
117+
118+
if err != nil {
119+
rawConn.Close()
120+
return nil, err
121+
}
122+
123+
return conn, nil
124+
}

vendor/golang.org/x/net/proxy/direct.go

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)