Skip to content

Commit 7e9b6b9

Browse files
authored
Merge pull request #56 from Chrisss93/dynamicListenerAddress
Advertised address for dynamic listeners
2 parents 878b182 + 5a6417e commit 7e9b6b9

File tree

4 files changed

+28
-19
lines changed

4 files changed

+28
-19
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ See:
8585
--debug-listen-address string Debug listen address (default "0.0.0.0:6060")
8686
--default-listener-ip string Default listener IP (default "127.0.0.1")
8787
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
88+
--dynamic-advertised-listener Advertised address for dynamic listeners (default "127.0.0.1")
8889
--dynamic-listeners-disable Disable dynamic listeners.
8990
--dynamic-sequential-min-port int If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.
9091
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started

cmd/kafka-proxy/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func init() {
8383
func initFlags() {
8484
// proxy
8585
Server.Flags().StringVar(&c.Proxy.DefaultListenerIP, "default-listener-ip", "127.0.0.1", "Default listener IP")
86+
Server.Flags().StringVar(&c.Proxy.DynamicAdvertisedListener, "dynamic-advertised-listener", "127.0.0.1", "Advertised address for dynamic listeners")
8687
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
8788
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
8889
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")

config/config.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,18 @@ type Config struct {
4949
MsgFiledName string
5050
}
5151
Proxy struct {
52-
DefaultListenerIP string
53-
BootstrapServers []ListenerConfig
54-
ExternalServers []ListenerConfig
55-
DialAddressMappings []DialAddressMapping
56-
DisableDynamicListeners bool
57-
DynamicSequentialMinPort int
58-
RequestBufferSize int
59-
ResponseBufferSize int
60-
ListenerReadBufferSize int // SO_RCVBUF
61-
ListenerWriteBufferSize int // SO_SNDBUF
62-
ListenerKeepAlive time.Duration
52+
DefaultListenerIP string
53+
BootstrapServers []ListenerConfig
54+
ExternalServers []ListenerConfig
55+
DialAddressMappings []DialAddressMapping
56+
DisableDynamicListeners bool
57+
DynamicAdvertisedListener string
58+
DynamicSequentialMinPort int
59+
RequestBufferSize int
60+
ResponseBufferSize int
61+
ListenerReadBufferSize int // SO_RCVBUF
62+
ListenerWriteBufferSize int // SO_SNDBUF
63+
ListenerKeepAlive time.Duration
6364

6465
TLS struct {
6566
Enable bool
@@ -251,6 +252,7 @@ func NewConfig() *Config {
251252
c.Http.HealthPath = "/health"
252253

253254
c.Proxy.DefaultListenerIP = "127.0.0.1"
255+
c.Proxy.DynamicAdvertisedListener = "127.0.0.1"
254256
c.Proxy.DisableDynamicListeners = false
255257
c.Proxy.RequestBufferSize = 4096
256258
c.Proxy.ResponseBufferSize = 4096

proxy/proxy.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ type Listeners struct {
1717
connSrc chan Conn
1818
// listen IP for dynamically start
1919
defaultListenerIP string
20+
// advertised address for dynamic listeners
21+
dynamicAdvertisedListener string
2022
// socket TCP options
2123
tcpConnOptions TCPConnOptions
2224

@@ -32,6 +34,7 @@ type Listeners struct {
3234
func NewListeners(cfg *config.Config) (*Listeners, error) {
3335

3436
defaultListenerIP := cfg.Proxy.DefaultListenerIP
37+
dynamicAdvertisedListener := cfg.Proxy.DynamicAdvertisedListener
3538

3639
tcpConnOptions := TCPConnOptions{
3740
KeepAlive: cfg.Proxy.ListenerKeepAlive,
@@ -61,13 +64,14 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
6164
}
6265

6366
return &Listeners{
64-
defaultListenerIP: defaultListenerIP,
65-
connSrc: make(chan Conn, 1),
66-
brokerToListenerConfig: brokerToListenerConfig,
67-
tcpConnOptions: tcpConnOptions,
68-
listenFunc: listenFunc,
69-
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
70-
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
67+
defaultListenerIP: defaultListenerIP,
68+
dynamicAdvertisedListener: dynamicAdvertisedListener,
69+
connSrc: make(chan Conn, 1),
70+
brokerToListenerConfig: brokerToListenerConfig,
71+
tcpConnOptions: tcpConnOptions,
72+
listenFunc: listenFunc,
73+
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
74+
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
7175
}, nil
7276
}
7377

@@ -154,7 +158,8 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32,
154158
}
155159
port := l.Addr().(*net.TCPAddr).Port
156160
address := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(port))
157-
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: address}
161+
advertisedAddress := net.JoinHostPort(p.dynamicAdvertisedListener, fmt.Sprint(port))
162+
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
158163
return p.defaultListenerIP, int32(port), nil
159164
}
160165

0 commit comments

Comments
 (0)