Skip to content

Commit 45e9794

Browse files
committed
Dynamic listener: advertise default-listener-ip if dynamic-advertised-listener is not provided
1 parent 7e9b6b9 commit 45e9794

File tree

4 files changed

+14
-6
lines changed

4 files changed

+14
-6
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ See:
8585
--debug-listen-address string Debug listen address (default "0.0.0.0:6060")
8686
--default-listener-ip string Default listener IP (default "127.0.0.1")
8787
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
88-
--dynamic-advertised-listener Advertised address for dynamic listeners (default "127.0.0.1")
88+
--dynamic-advertised-listener Advertised address for dynamic listeners. If empty, default-listener-ip is used
8989
--dynamic-listeners-disable Disable dynamic listeners.
9090
--dynamic-sequential-min-port int If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.
9191
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started

cmd/kafka-proxy/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func init() {
8383
func initFlags() {
8484
// proxy
8585
Server.Flags().StringVar(&c.Proxy.DefaultListenerIP, "default-listener-ip", "127.0.0.1", "Default listener IP")
86-
Server.Flags().StringVar(&c.Proxy.DynamicAdvertisedListener, "dynamic-advertised-listener", "127.0.0.1", "Advertised address for dynamic listeners")
86+
Server.Flags().StringVar(&c.Proxy.DynamicAdvertisedListener, "dynamic-advertised-listener", "", "Advertised address for dynamic listeners. If empty, default-listener-ip is used")
8787
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
8888
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
8989
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")

config/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,6 @@ func NewConfig() *Config {
252252
c.Http.HealthPath = "/health"
253253

254254
c.Proxy.DefaultListenerIP = "127.0.0.1"
255-
c.Proxy.DynamicAdvertisedListener = "127.0.0.1"
256255
c.Proxy.DisableDynamicListeners = false
257256
c.Proxy.RequestBufferSize = 4096
258257
c.Proxy.ResponseBufferSize = 4096

proxy/proxy.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32,
143143
defer p.lock.Unlock()
144144
// double check
145145
if v, ok := p.brokerToListenerConfig[brokerAddress]; ok {
146-
return v.AdvertisedAddress, 0, nil
146+
return util.SplitHostPort(v.AdvertisedAddress)
147147
}
148148

149149
defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
@@ -158,9 +158,18 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32,
158158
}
159159
port := l.Addr().(*net.TCPAddr).Port
160160
address := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(port))
161-
advertisedAddress := net.JoinHostPort(p.dynamicAdvertisedListener, fmt.Sprint(port))
161+
162+
dynamicAdvertisedListener := p.dynamicAdvertisedListener
163+
if dynamicAdvertisedListener == "" {
164+
dynamicAdvertisedListener = p.defaultListenerIP
165+
}
166+
167+
advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
162168
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
163-
return p.defaultListenerIP, int32(port), nil
169+
170+
logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
171+
172+
return dynamicAdvertisedListener, int32(port), nil
164173
}
165174

166175
func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn, error) {

0 commit comments

Comments
 (0)