Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func initFlags() {
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
Server.Flags().Uint16Var(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
Server.Flags().Uint16Var(&c.Proxy.DynamicSequentialMaxPorts, "dynamic-sequential-max-ports", 0, "If set to non-zero, ports are allocated sequentially from the half open interval [dynamic-sequential-min-port, dynamic-sequential-min-port + dynamic-sequential-max-ports)")

Server.Flags().IntVar(&c.Proxy.RequestBufferSize, "proxy-request-buffer-size", 4096, "Request buffer size pro tcp connection")
Server.Flags().IntVar(&c.Proxy.ResponseBufferSize, "proxy-response-buffer-size", 4096, "Response buffer size pro tcp connection")
Expand Down
72 changes: 72 additions & 0 deletions cmd/kafka-proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,78 @@ func TestSameClientCertEnabledWithMissingFlags(t *testing.T) {
})
}

func TestDynamicPortIntervals(t *testing.T) {

setupBootstrapServersMappingTest()
noMinPort := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
"--deterministic-listeners", "",
}

noMinPort2 := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
"--dynamic-sequential-max-ports", "2000",
}

t.Run("MinPortMandatoryForDeterministicListeners", func(t *testing.T) {
serverPreRunFailure(t, noMinPort, "Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DeterministicListeners is enabled")
})
t.Run("MinPortMandatoryIfMaxPortsIsSet", func(t *testing.T) {
serverPreRunFailure(t, noMinPort2, "Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DynamicSequentialMaxPorts is set")
})

args1 := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
"--dynamic-sequential-min-port", "2000",
"--dynamic-sequential-max-ports", "2000",
}
_ = Server.ParseFlags(args1)
err := Server.PreRunE(nil, args1)
a := assert.New(t)
a.Nil(err)

args2 := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
"--dynamic-sequential-min-port", "2000",
"--dynamic-sequential-max-ports", "2000",
"--deterministic-listeners", "",
}
_ = Server.ParseFlags(args2)
err = Server.PreRunE(nil, args2)
a = assert.New(t)
a.Nil(err)

args3 := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
"--dynamic-sequential-min-port", "2000",
"--deterministic-listeners", "",
}
_ = Server.ParseFlags(args3)
err = Server.PreRunE(nil, args3)
a = assert.New(t)
a.Nil(err)

args4 := []string{"cobra.test",
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
}
_ = Server.ParseFlags(args4)
err = Server.PreRunE(nil, args4)
a = assert.New(t)
a.Nil(err)
}

func serverPreRunFailure(t *testing.T, cmdLineFlags []string, expectedErrorMsg string) {
setupBootstrapServersMappingTest()

Expand Down
21 changes: 20 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ type Config struct {
DialAddressMappings []DialAddressMapping
DisableDynamicListeners bool
DynamicAdvertisedListener string
DynamicSequentialMinPort int
DynamicSequentialMinPort uint16
DynamicSequentialMaxPorts uint16
RequestBufferSize int
ResponseBufferSize int
ListenerReadBufferSize int // SO_RCVBUF
Expand Down Expand Up @@ -432,5 +433,23 @@ func (c *Config) Validate() error {
}

}

if !c.Proxy.DisableDynamicListeners {
if c.Proxy.DynamicSequentialMinPort == 0 && c.Proxy.DeterministicListeners {
// dynamic-sequential-min-port must be set for deterministic-listeners to be enabled, as the latter
// does not work with random (OS allocated ephemeral) ports.
return errors.New("Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DeterministicListeners is enabled")
}
if c.Proxy.DynamicSequentialMinPort == 0 && c.Proxy.DynamicSequentialMaxPorts > 0 {
// dynamic-sequential-min-port must be set if dynamic-sequential-max-ports is set, as the latter
// does not work with random (OS allocated ephemeral) ports.
return errors.New("Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DynamicSequentialMaxPorts is set")
}
// Set default for DynamicSequentialMaxPorts if DynamicSequentialMinPort is set, to make sure
// ports never exceed the 16-bit max port number of 65535.
if c.Proxy.DynamicSequentialMaxPorts == 0 && c.Proxy.DynamicSequentialMinPort > 0 {
c.Proxy.DynamicSequentialMaxPorts = uint16(65536 - uint32(c.Proxy.DynamicSequentialMinPort))
}
}
return nil
}
42 changes: 32 additions & 10 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ type Listeners struct {

listenFunc ListenFunc

deterministicListeners bool
disableDynamicListeners bool
dynamicSequentialMinPort int
deterministicListeners bool
disableDynamicListeners bool
dynamicSequentialMinPort uint16
currentDynamicPortCounter uint64
dynamicSequentialMaxPorts uint16

brokerToListenerConfig map[string]*ListenerConfig
lock sync.RWMutex
Expand Down Expand Up @@ -74,6 +76,8 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
deterministicListeners: cfg.Proxy.DeterministicListeners,
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
currentDynamicPortCounter: 0,
dynamicSequentialMaxPorts: cfg.Proxy.DynamicSequentialMaxPorts,
}, nil
}

Expand Down Expand Up @@ -149,6 +153,16 @@ func (p *Listeners) findListenerConfig(brokerId int32) *ListenerConfig {
return nil
}

// Make sure all dynamically allocated ports are within the half open interval
// [dynamicSequentialMinPort, dynamicSequentialMinPort + dynamicSequentialMaxPorts).
func (p *Listeners) nextDynamicPort(portOffset uint64, brokerAddress string, brokerId int32) (uint16, error) {
port := p.dynamicSequentialMinPort + uint16(portOffset%uint64(p.dynamicSequentialMaxPorts))
if port < p.dynamicSequentialMinPort {
return 0, fmt.Errorf("port assignment overflow %s %d: %d", brokerAddress, brokerId, port)
}
return port, nil
}

func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) {
p.lock.Lock()
defer p.lock.Unlock()
Expand All @@ -162,11 +176,11 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
if brokerId < 0 {
return "", 0, fmt.Errorf("brokerId is negative %s %d", brokerAddress, brokerId)
}
deterministicPort := p.dynamicSequentialMinPort + int(brokerId)
if deterministicPort < p.dynamicSequentialMinPort {
return "", 0, fmt.Errorf("port assignment overflow %s %d: %d", brokerAddress, brokerId, deterministicPort)
deterministicPort, err := p.nextDynamicPort(uint64(brokerId), brokerAddress, brokerId)
if err != nil {
return "", 0, err
}
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(deterministicPort))
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(int(deterministicPort)))
cfg := p.findListenerConfig(brokerId)
if cfg != nil {
oldBrokerAddress := cfg.GetBrokerAddress()
Expand All @@ -179,9 +193,17 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
return util.SplitHostPort(cfg.AdvertisedAddress)
}
} else {
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(p.dynamicSequentialMinPort))
if p.dynamicSequentialMinPort != 0 {
p.dynamicSequentialMinPort += 1
if p.dynamicSequentialMinPort == 0 {
// Use random (non sequential) ephemeral free port, allocated by OS.
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(0))
} else {
// Use sequentially allocated port.
port, err := p.nextDynamicPort(uint64(p.currentDynamicPortCounter), brokerAddress, brokerId)
if err != nil {
return "", 0, err
}
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(int(port)))
p.currentDynamicPortCounter += 1
}
}
cfg := NewListenerConfig(brokerAddress, listenerAddress, "", brokerId)
Expand Down