Skip to content

Commit 13e4f8b

Browse files
committed
Add support for connect to Kafka through HTTP Proxy using CONNECT method. Replace socks5-address, socks5-username and socks5-password parameters by forward-proxy parameter
1 parent e57694d commit 13e4f8b

File tree

6 files changed

+112
-66
lines changed

6 files changed

+112
-66
lines changed

README.md

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ See:
9090
--dynamic-listeners-disable Disable dynamic listeners.
9191
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started
9292
--forbidden-api-keys intSlice Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics
93+
--forward-proxy string URL of the forward proxy. Supported schemas are http and socks5
9394
-h, --help help for server
9495
--http-disable Disable HTTP endpoints
9596
--http-health-path string Path on which to health endpoint (default "/health")
@@ -121,9 +122,6 @@ See:
121122
--sasl-jaas-config-file string Location of JAAS config file with SASL username and password
122123
--sasl-password string SASL user password
123124
--sasl-username string SASL user name
124-
--socks5-address string Address of SOCKS5 proxy to connect through when connecting to kafka brokers
125-
--socks5-password string Password for SOCKS5 proxy Username/Password Authentication
126-
--socks5-username string Username for SOCKS5 proxy Username/Password Authentication
127125
--tls-ca-chain-cert-file string PEM encoded CA's certificate file
128126
--tls-client-cert-file string PEM encoded file with client certificate
129127
--tls-client-key-file string PEM encoded file with private key for the client certificate
@@ -212,17 +210,51 @@ Authentication between Kafka Proxy Client and Kafka Proxy Server with Google-ID
212210
213211
### Connect to Kafka through SOCKS5 Proxy example
214212
213+
Connect through test SOCKS5 Proxy server
214+
215+
```
216+
kafka-proxy tools socks5-proxy --addr localhost:1080
217+
215218
kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
216219
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
217220
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502"
218-
--socks5-address localhost:1080
221+
--forward-proxy socks5://localhost:1080
222+
223+
```
224+
225+
```
226+
kafka-proxy tools socks5-proxy --addr localhost:1080 --username my-proxy-user --password my-proxy-password
219227

220228
kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
221229
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
222230
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
223-
--socks5-address localhost:1080 \
224-
--socks5-username my-proxy-user \
225-
--socks5-password my-proxy-password
231+
--forward-proxy socks5://my-proxy-user:my-proxy-password@localhost:1080
232+
233+
```
234+
235+
### Connect to Kafka through HTTP Proxy example
236+
237+
Connect through test HTTP Proxy server using CONNECT method
238+
239+
```
240+
kafka-proxy tools http-proxy --addr localhost:3128
241+
242+
kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
243+
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
244+
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502"
245+
--forward-proxy http://localhost:3128
246+
247+
```
248+
249+
```
250+
kafka-proxy tools http-proxy --addr localhost:3128 --username my-proxy-user --password my-proxy-password
251+
252+
kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
253+
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
254+
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
255+
--forward-proxy http://my-proxy-user:my-proxy-password@localhost:3128
256+
257+
```
226258
227259
### Kubernetes sidecar container example
228260

cmd/kafka-proxy/server.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,8 @@ func init() {
149149
Server.Flags().StringVar(&c.Log.Format, "log-format", "text", "Log format text or json")
150150
Server.Flags().StringVar(&c.Log.Level, "log-level", "info", "Log level debug, info, warning, error, fatal or panic")
151151

152-
// Socks5 to Kafka
153-
Server.Flags().StringVar(&c.Socks5.ProxyAddress, "socks5-address", "", "Address of SOCKS5 proxy to connect through when connecting to kafka brokers")
154-
Server.Flags().StringVar(&c.Socks5.Username, "socks5-username", "", "Username for SOCKS5 proxy Username/Password Authentication")
155-
Server.Flags().StringVar(&c.Socks5.Password, "socks5-password", "", "Password for SOCKS5 proxy Username/Password Authentication")
152+
// Connect through Socks5 or HTTP CONNECT to Kafka
153+
Server.Flags().StringVar(&c.ForwardProxy.Url, "forward-proxy", "", "URL of the forward proxy. Supported schemas are socks5 and http")
156154
}
157155

158156
func Run(_ *cobra.Command, _ []string) {

cmd/tools/tools.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func httpProxyServer(cmd *cobra.Command, _ []string) error {
4848
proxy := goproxy.NewProxyHttpServer()
4949
proxy.Verbose = verbose
5050
if username != "" && password != "" {
51-
logrus.Info("HTTP proxy will require basic authentication")
51+
logrus.Info("HTTP proxy will require Basic Proxy-Authorization for CONNECT")
5252

5353
proxy.OnRequest().HandleConnect(auth.BasicConnect("", func(user, passwd string) bool {
5454
return user == username && passwd == password
@@ -65,12 +65,8 @@ func socks5ProxyServer(cmd *cobra.Command, _ []string) error {
6565
addr, _ := cmd.Flags().GetString("addr")
6666

6767
conf := &socks5.Config{}
68-
server, err := socks5.New(conf)
69-
if err != nil {
70-
return err
71-
}
7268
if username != "" && password != "" {
73-
logrus.Info("SOCKS5 proxy will require basic authentication", addr)
69+
logrus.Info("SOCKS5 proxy will require Username/Password Authentication")
7470

7571
authenticator := &socks5.UserPassAuthenticator{
7672
Credentials: socks5ProxyCredentials{
@@ -80,7 +76,10 @@ func socks5ProxyServer(cmd *cobra.Command, _ []string) error {
8076
}
8177
conf.AuthMethods = []socks5.Authenticator{authenticator}
8278
}
83-
79+
server, err := socks5.New(conf)
80+
if err != nil {
81+
return err
82+
}
8483
logrus.Infof("Starting SOCKS5 proxy server on %s", addr)
8584
return server.ListenAndServe("tcp", addr)
8685
}

config/config.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
66
"github.com/pkg/errors"
77
"net"
8+
"net/url"
89
"strings"
910
"time"
1011
)
@@ -120,10 +121,13 @@ type Config struct {
120121
JaasConfigFile string
121122
}
122123
}
123-
Socks5 struct {
124-
ProxyAddress string
125-
Username string
126-
Password string
124+
ForwardProxy struct {
125+
Url string
126+
127+
Scheme string
128+
Address string
129+
Username string
130+
Password string
127131
}
128132
}
129133

@@ -266,21 +270,32 @@ func (c *Config) Validate() error {
266270
if c.Auth.Gateway.Server.Enable && c.Auth.Gateway.Server.Timeout <= 0 {
267271
return errors.New("Auth.Gateway.Server.Timeout must be greater than 0")
268272
}
269-
if c.Socks5.ProxyAddress == "" && (c.Socks5.Username != "" || c.Socks5.Password != "") {
270-
return errors.New("Socks5.ProxyAddress must not be empty when Socks5 Username/Password is provided")
271-
}
272-
if (c.Socks5.Username != "" && c.Socks5.Password == "") || (c.Socks5.Username == "" && c.Socks5.Password != "") {
273-
return errors.New("Both Socks5 Username and Password must be provided provided")
274-
}
275-
if len(c.Socks5.Username) > 255 || len(c.Socks5.Password) > 255 {
276-
// RFC1929
277-
return errors.New("Max length of Socks5 Username/Password is 255 chars")
278-
}
279-
if c.Socks5.ProxyAddress != "" {
280-
if _, _, err := util.SplitHostPort(c.Socks5.ProxyAddress); err != nil {
273+
// http://username:password@hostname:port or socks5://username:password@hostname:port
274+
if c.ForwardProxy.Url != "" {
275+
var proxyUrl *url.URL
276+
var err error
277+
if proxyUrl, err = url.Parse(c.ForwardProxy.Url); err != nil {
281278
return err
282279
}
283-
}
280+
if proxyUrl.Port() == "" {
281+
return errors.New("Port part of ForwardProxy.Url must not be empty")
282+
}
283+
c.ForwardProxy.Address = proxyUrl.Host
284284

285+
if proxyUrl.Scheme != "http" && proxyUrl.Scheme != "socks5" {
286+
return errors.New("ForwardProxy.Url Scheme must be http or socks5")
287+
}
288+
c.ForwardProxy.Scheme = proxyUrl.Scheme
289+
290+
if proxyUrl.User != nil {
291+
password, _ := proxyUrl.User.Password()
292+
if proxyUrl.User.Username() == "" || password == "" {
293+
return errors.New("Both ForwardProxy Url Username and Password must be provided")
294+
}
295+
c.ForwardProxy.Username = proxyUrl.User.Username()
296+
c.ForwardProxy.Password = password
297+
}
298+
299+
}
285300
return nil
286301
}

proxy/client.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -109,24 +109,38 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
109109
}
110110

111111
func newDialer(c *config.Config, tlsConfig *tls.Config) (Dialer, error) {
112+
directDialer := directDialer{
113+
dialTimeout: c.Kafka.DialTimeout,
114+
keepAlive: c.Kafka.KeepAlive,
115+
}
116+
112117
var rawDialer Dialer
113-
if c.Socks5.ProxyAddress != "" {
114-
logrus.Infof("Kafka clients will connect through the SOCKS5 proxy %s", c.Socks5.ProxyAddress)
115-
rawDialer = &socks5Dialer{
116-
directDialer: directDialer{
117-
dialTimeout: c.Kafka.DialTimeout,
118-
keepAlive: c.Kafka.KeepAlive,
119-
},
120-
proxyNetwork: "tcp",
121-
proxyAddr: c.Socks5.ProxyAddress,
122-
username: c.Socks5.Username,
123-
password: c.Socks5.Password,
118+
if c.ForwardProxy.Url != "" {
119+
switch c.ForwardProxy.Scheme {
120+
case "socks5":
121+
logrus.Infof("Kafka clients will connect through the SOCKS5 proxy %s", c.ForwardProxy.Address)
122+
rawDialer = &socks5Dialer{
123+
directDialer: directDialer,
124+
proxyNetwork: "tcp",
125+
proxyAddr: c.ForwardProxy.Address,
126+
username: c.ForwardProxy.Username,
127+
password: c.ForwardProxy.Password,
128+
}
129+
case "http":
130+
logrus.Infof("Kafka clients will connect through the HTTP proxy %s using CONNECT", c.ForwardProxy.Address)
131+
132+
rawDialer = &httpProxy{
133+
forwardDialer: directDialer,
134+
network: "tcp",
135+
hostPort: c.ForwardProxy.Address,
136+
username: c.ForwardProxy.Username,
137+
password: c.ForwardProxy.Password,
138+
}
139+
default:
140+
return nil, errors.New("Only http or socks5 proxy is supported")
124141
}
125142
} else {
126-
rawDialer = directDialer{
127-
dialTimeout: c.Kafka.DialTimeout,
128-
keepAlive: c.Kafka.KeepAlive,
129-
}
143+
rawDialer = directDialer
130144
}
131145
if c.Kafka.TLS.Enable {
132146
if tlsConfig == nil {

proxy/dial.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ func (d tlsDialer) Dial(network, addr string) (net.Conn, error) {
139139

140140
type httpProxy struct {
141141
forwardDialer Dialer
142+
network string
142143
hostPort string
143144
username, password string
144145
}
@@ -154,10 +155,11 @@ func (s *httpProxy) Dial(network, addr string) (net.Conn, error) {
154155
}
155156
req.Close = false
156157
if s.username != "" && s.password != "" {
157-
req.Header.Set("Proxy-Authorization", base64.StdEncoding.EncodeToString([]byte(s.username+":"+s.password)))
158+
basic := "Basic " + base64.StdEncoding.EncodeToString([]byte(s.username+":"+s.password))
159+
req.Header.Set("Proxy-Authorization", basic)
158160
}
159161

160-
c, err := s.forwardDialer.Dial("tcp", s.hostPort)
162+
c, err := s.forwardDialer.Dial(s.network, s.hostPort)
161163
if err != nil {
162164
return nil, err
163165
}
@@ -180,17 +182,3 @@ func (s *httpProxy) Dial(network, addr string) (net.Conn, error) {
180182

181183
return c, nil
182184
}
183-
184-
func newHTTPProxy(uri *url.URL, forward Dialer) (Dialer, error) {
185-
s := new(httpProxy)
186-
s.hostPort = uri.Host
187-
if uri.Port() == "" {
188-
return nil, fmt.Errorf("http proxy url doesn't contain a port [%v]", uri)
189-
}
190-
s.forwardDialer = forward
191-
if uri.User != nil {
192-
s.username = uri.User.Username()
193-
s.password, _ = uri.User.Password()
194-
}
195-
return s, nil
196-
}

0 commit comments

Comments
 (0)