From a42c4b5f86c42f69516f24c783613dca5b9b6bd5 Mon Sep 17 00:00:00 2001 From: Alexander Berger Date: Tue, 11 Mar 2025 15:01:11 +0100 Subject: [PATCH] Allow constraining dynamic listener ports to a specific port range --- cmd/kafka-proxy/server.go | 3 +- cmd/kafka-proxy/server_test.go | 72 ++++++++++++++++++++++++++++++++++ config/config.go | 21 +++++++++- proxy/proxy.go | 42 +++++++++++++++----- 4 files changed, 126 insertions(+), 12 deletions(-) diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index 6c23cd94..f888f61c 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -93,7 +93,8 @@ func initFlags() { Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment") Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).") Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.") - Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.") + Server.Flags().Uint16Var(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.") + Server.Flags().Uint16Var(&c.Proxy.DynamicSequentialMaxPorts, "dynamic-sequential-max-ports", 0, "If set to non-zero, ports are allocated sequentially from the half open interval [dynamic-sequential-min-port, dynamic-sequential-min-port + dynamic-sequential-max-ports)") Server.Flags().IntVar(&c.Proxy.RequestBufferSize, "proxy-request-buffer-size", 4096, "Request buffer size pro tcp connection") Server.Flags().IntVar(&c.Proxy.ResponseBufferSize, "proxy-response-buffer-size", 4096, "Response buffer size pro tcp connection") diff --git a/cmd/kafka-proxy/server_test.go b/cmd/kafka-proxy/server_test.go index e34d9bca..26a3dc80 100644 --- a/cmd/kafka-proxy/server_test.go +++ b/cmd/kafka-proxy/server_test.go @@ -228,6 +228,78 @@ func TestSameClientCertEnabledWithMissingFlags(t *testing.T) { }) } +func TestDynamicPortIntervals(t *testing.T) { + + setupBootstrapServersMappingTest() + noMinPort := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + "--deterministic-listeners", "", + } + + noMinPort2 := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + "--dynamic-sequential-max-ports", "2000", + } + + t.Run("MinPortMandatoryForDeterministicListeners", func(t *testing.T) { + serverPreRunFailure(t, noMinPort, "Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DeterministicListeners is enabled") + }) + t.Run("MinPortMandatoryIfMaxPortsIsSet", func(t *testing.T) { + serverPreRunFailure(t, noMinPort2, "Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DynamicSequentialMaxPorts is set") + }) + + args1 := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + "--dynamic-sequential-min-port", "2000", + "--dynamic-sequential-max-ports", "2000", + } + _ = Server.ParseFlags(args1) + err := Server.PreRunE(nil, args1) + a := assert.New(t) + a.Nil(err) + + args2 := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + "--dynamic-sequential-min-port", "2000", + "--dynamic-sequential-max-ports", "2000", + "--deterministic-listeners", "", + } + _ = Server.ParseFlags(args2) + err = Server.PreRunE(nil, args2) + a = assert.New(t) + a.Nil(err) + + args3 := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + "--dynamic-sequential-min-port", "2000", + "--deterministic-listeners", "", + } + _ = Server.ParseFlags(args3) + err = Server.PreRunE(nil, args3) + a = assert.New(t) + a.Nil(err) + + args4 := []string{"cobra.test", + "--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402", + "--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403", + } + _ = Server.ParseFlags(args4) + err = Server.PreRunE(nil, args4) + a = assert.New(t) + a.Nil(err) +} + func serverPreRunFailure(t *testing.T, cmdLineFlags []string, expectedErrorMsg string) { setupBootstrapServersMappingTest() diff --git a/config/config.go b/config/config.go index fbcb0599..dbeb0ac3 100644 --- a/config/config.go +++ b/config/config.go @@ -81,7 +81,8 @@ type Config struct { DialAddressMappings []DialAddressMapping DisableDynamicListeners bool DynamicAdvertisedListener string - DynamicSequentialMinPort int + DynamicSequentialMinPort uint16 + DynamicSequentialMaxPorts uint16 RequestBufferSize int ResponseBufferSize int ListenerReadBufferSize int // SO_RCVBUF @@ -432,5 +433,23 @@ func (c *Config) Validate() error { } } + + if !c.Proxy.DisableDynamicListeners { + if c.Proxy.DynamicSequentialMinPort == 0 && c.Proxy.DeterministicListeners { + // dynamic-sequential-min-port must be set for deterministic-listeners to be enabled, as the latter + // does not work with random (OS allocated ephemeral) ports. + return errors.New("Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DeterministicListeners is enabled") + } + if c.Proxy.DynamicSequentialMinPort == 0 && c.Proxy.DynamicSequentialMaxPorts > 0 { + // dynamic-sequential-min-port must be set if dynamic-sequential-max-ports is set, as the latter + // does not work with random (OS allocated ephemeral) ports. + return errors.New("Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DynamicSequentialMaxPorts is set") + } + // Set default for DynamicSequentialMaxPorts if DynamicSequentialMinPort is set, to make sure + // ports never exceed the 16-bit max port number of 65535. + if c.Proxy.DynamicSequentialMaxPorts == 0 && c.Proxy.DynamicSequentialMinPort > 0 { + c.Proxy.DynamicSequentialMaxPorts = uint16(65536 - uint32(c.Proxy.DynamicSequentialMinPort)) + } + } return nil } diff --git a/proxy/proxy.go b/proxy/proxy.go index 6fbf4190..d59972cb 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -28,9 +28,11 @@ type Listeners struct { listenFunc ListenFunc - deterministicListeners bool - disableDynamicListeners bool - dynamicSequentialMinPort int + deterministicListeners bool + disableDynamicListeners bool + dynamicSequentialMinPort uint16 + currentDynamicPortCounter uint64 + dynamicSequentialMaxPorts uint16 brokerToListenerConfig map[string]*ListenerConfig lock sync.RWMutex @@ -74,6 +76,8 @@ func NewListeners(cfg *config.Config) (*Listeners, error) { deterministicListeners: cfg.Proxy.DeterministicListeners, disableDynamicListeners: cfg.Proxy.DisableDynamicListeners, dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort, + currentDynamicPortCounter: 0, + dynamicSequentialMaxPorts: cfg.Proxy.DynamicSequentialMaxPorts, }, nil } @@ -149,6 +153,16 @@ func (p *Listeners) findListenerConfig(brokerId int32) *ListenerConfig { return nil } +// Make sure all dynamically allocated ports are within the half open interval +// [dynamicSequentialMinPort, dynamicSequentialMinPort + dynamicSequentialMaxPorts). +func (p *Listeners) nextDynamicPort(portOffset uint64, brokerAddress string, brokerId int32) (uint16, error) { + port := p.dynamicSequentialMinPort + uint16(portOffset%uint64(p.dynamicSequentialMaxPorts)) + if port < p.dynamicSequentialMinPort { + return 0, fmt.Errorf("port assignment overflow %s %d: %d", brokerAddress, brokerId, port) + } + return port, nil +} + func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) { p.lock.Lock() defer p.lock.Unlock() @@ -162,11 +176,11 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) if brokerId < 0 { return "", 0, fmt.Errorf("brokerId is negative %s %d", brokerAddress, brokerId) } - deterministicPort := p.dynamicSequentialMinPort + int(brokerId) - if deterministicPort < p.dynamicSequentialMinPort { - return "", 0, fmt.Errorf("port assignment overflow %s %d: %d", brokerAddress, brokerId, deterministicPort) + deterministicPort, err := p.nextDynamicPort(uint64(brokerId), brokerAddress, brokerId) + if err != nil { + return "", 0, err } - listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(deterministicPort)) + listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(int(deterministicPort))) cfg := p.findListenerConfig(brokerId) if cfg != nil { oldBrokerAddress := cfg.GetBrokerAddress() @@ -179,9 +193,17 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) return util.SplitHostPort(cfg.AdvertisedAddress) } } else { - listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(p.dynamicSequentialMinPort)) - if p.dynamicSequentialMinPort != 0 { - p.dynamicSequentialMinPort += 1 + if p.dynamicSequentialMinPort == 0 { + // Use random (non sequential) ephemeral free port, allocated by OS. + listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(0)) + } else { + // Use sequentially allocated port. + port, err := p.nextDynamicPort(uint64(p.currentDynamicPortCounter), brokerAddress, brokerId) + if err != nil { + return "", 0, err + } + listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(int(port))) + p.currentDynamicPortCounter += 1 } } cfg := NewListenerConfig(brokerAddress, listenerAddress, "", brokerId)