Skip to content

Commit b8646e4

Browse files
committed
feat: Add support for deterministic listener ports (based on broker ID)
1 parent 44b9dcb commit b8646e4

File tree

7 files changed

+179
-49
lines changed

7 files changed

+179
-49
lines changed

cmd/kafka-proxy/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func initFlags() {
8989
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
9090
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
9191
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
92+
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
9293
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
9394
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
9495

config/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@ var (
2222
Version = "unknown"
2323
)
2424

25-
type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error)
25+
type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error)
2626

2727
type ListenerConfig struct {
2828
BrokerAddress string
2929
ListenerAddress string
3030
AdvertisedAddress string
3131
}
32+
type IdListenerConfig struct {
33+
BrokerAddress string
34+
Listener net.Listener
35+
}
3236
type DialAddressMapping struct {
3337
SourceAddress string
3438
DestinationAddress string
@@ -74,6 +78,7 @@ type Config struct {
7478
DefaultListenerIP string
7579
BootstrapServers []ListenerConfig
7680
ExternalServers []ListenerConfig
81+
DeterministicListeners bool
7782
DialAddressMappings []DialAddressMapping
7883
DisableDynamicListeners bool
7984
DynamicAdvertisedListener string

proxy/processor_default_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package proxy
33
import (
44
"bytes"
55
"encoding/hex"
6+
"testing"
7+
"time"
8+
69
"github.com/grepplabs/kafka-proxy/proxy/protocol"
710
"github.com/pkg/errors"
811
"github.com/stretchr/testify/assert"
9-
"testing"
10-
"time"
1112
)
1213

1314
func TestHandleRequest(t *testing.T) {
@@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) {
130131
}
131132

132133
func TestHandleResponse(t *testing.T) {
133-
netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
134+
netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
134135
if brokerHost == "localhost" {
135136
switch brokerPort {
136137
case 19092:

proxy/protocol/responses.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const (
1414
brokersKeyName = "brokers"
1515
hostKeyName = "host"
1616
portKeyName = "port"
17+
nodeKeyName = "node_id"
1718

1819
coordinatorKeyName = "coordinator"
1920
coordinatorsKeyName = "coordinators"
@@ -320,12 +321,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
320321
if !ok {
321322
return errors.New("broker.port not found")
322323
}
324+
nodeId, ok := broker.Get(nodeKeyName).(int32)
325+
if !ok {
326+
return errors.New("broker.node_id not found")
327+
}
323328

324329
if host == "" && port <= 0 {
325330
continue
326331
}
327332

328-
newHost, newPort, err := fn(host, port)
333+
newHost, newPort, err := fn(host, port, nodeId)
329334
if err != nil {
330335
return err
331336
}
@@ -383,12 +388,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e
383388
if !ok {
384389
return errors.New("coordinator.port not found")
385390
}
391+
nodeId, ok := coordinator.Get(nodeKeyName).(int32)
392+
if !ok {
393+
return errors.New("coordinator.node_id not found")
394+
}
386395

387396
if host == "" && port <= 0 {
388397
return nil
389398
}
390399

391-
newHost, newPort, err := fn(host, port)
400+
newHost, newPort, err := fn(host, port, nodeId)
392401
if err != nil {
393402
return err
394403
}

proxy/protocol/responses_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package protocol
33
import (
44
"encoding/hex"
55
"fmt"
6-
"github.com/google/uuid"
76
"reflect"
87
"strings"
98
"testing"
109

10+
"github.com/google/uuid"
11+
1112
"github.com/grepplabs/kafka-proxy/config"
1213
"github.com/pkg/errors"
1314
"github.com/stretchr/testify/assert"
@@ -20,7 +21,7 @@ var (
2021
// topic_metadata
2122
0x00, 0x00, 0x00, 0x00}
2223

23-
testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
24+
testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
2425
if brokerHost == "localhost" && brokerPort == 51 {
2526
return "myhost1", 34001, nil
2627
} else if brokerHost == "google.com" && brokerPort == 273 {
@@ -31,7 +32,7 @@ var (
3132
return "", 0, errors.New("unexpected data")
3233
}
3334

34-
testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
35+
testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
3536
if brokerHost == "localhost" && brokerPort == 19092 {
3637
return "myhost1", 34001, nil
3738
} else if brokerHost == "localhost" && brokerPort == 29092 {
@@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) {
374375
a.Nil(err)
375376
a.Equal(bytes, resp)
376377

377-
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
378+
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
378379
if brokerHost == "localhost" && brokerPort == 51 {
379380
return "azure.microsoft.com", 34001, nil
380381
} else if brokerHost == "google.com" && brokerPort == 273 {

proxy/proxy.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ type Listeners struct {
2525

2626
listenFunc ListenFunc
2727

28+
deterministicListeners bool
2829
disableDynamicListeners bool
2930
dynamicSequentialMinPort int
3031

31-
brokerToListenerConfig map[string]config.ListenerConfig
32-
lock sync.RWMutex
32+
brokerToListenerConfig map[string]config.ListenerConfig
33+
brokerIdToIdListenerConfig map[int32]config.IdListenerConfig
34+
lock sync.RWMutex
3335
}
3436

3537
func NewListeners(cfg *config.Config) (*Listeners, error) {
@@ -64,15 +66,19 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
6466
return nil, err
6567
}
6668

69+
brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig)
70+
6771
return &Listeners{
68-
defaultListenerIP: defaultListenerIP,
69-
dynamicAdvertisedListener: dynamicAdvertisedListener,
70-
connSrc: make(chan Conn, 1),
71-
brokerToListenerConfig: brokerToListenerConfig,
72-
tcpConnOptions: tcpConnOptions,
73-
listenFunc: listenFunc,
74-
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
75-
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
72+
defaultListenerIP: defaultListenerIP,
73+
dynamicAdvertisedListener: dynamicAdvertisedListener,
74+
connSrc: make(chan Conn, 1),
75+
brokerToListenerConfig: brokerToListenerConfig,
76+
brokerIdToIdListenerConfig: brokerIdToIdListenerConfig,
77+
tcpConnOptions: tcpConnOptions,
78+
listenFunc: listenFunc,
79+
deterministicListeners: cfg.Proxy.DeterministicListeners,
80+
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
81+
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
7682
}, nil
7783
}
7884

@@ -117,7 +123,7 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo
117123
return brokerToListenerConfig, nil
118124
}
119125

120-
func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
126+
func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
121127
if brokerHost == "" || brokerPort <= 0 {
122128
return "", 0, fmt.Errorf("broker address '%s:%d' is invalid", brokerHost, brokerPort)
123129
}
@@ -126,30 +132,49 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (l
126132

127133
p.lock.RLock()
128134
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
135+
idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId]
129136
p.lock.RUnlock()
130137

131138
if ok {
132139
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
133140
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
134141
}
135142
if !p.disableDynamicListeners {
143+
if brokerIdFound {
144+
logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId)
145+
// Existing broker ID found, but with a different upstream broker
146+
// Close existing listener, remove two mappings:
147+
// * ID to removed upstream broker
148+
// * removed upstream broker
149+
idListenerConfig.Listener.Close()
150+
p.lock.Lock()
151+
delete(p.brokerIdToIdListenerConfig, brokerId)
152+
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
153+
p.lock.Unlock()
154+
}
136155
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
137-
return p.ListenDynamicInstance(brokerAddress)
156+
return p.ListenDynamicInstance(brokerAddress, brokerId)
138157
}
139158
return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort)
140159
}
141160

142-
func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, error) {
161+
func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) {
143162
p.lock.Lock()
144163
defer p.lock.Unlock()
145164
// double check
146165
if v, ok := p.brokerToListenerConfig[brokerAddress]; ok {
147166
return util.SplitHostPort(v.AdvertisedAddress)
148167
}
149168

150-
defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
151-
if p.dynamicSequentialMinPort != 0 {
152-
p.dynamicSequentialMinPort += 1
169+
var defaultListenerAddress string
170+
171+
if p.deterministicListeners {
172+
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId)))
173+
} else {
174+
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
175+
if p.dynamicSequentialMinPort != 0 {
176+
p.dynamicSequentialMinPort += 1
177+
}
153178
}
154179

155180
cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
@@ -167,6 +192,7 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32,
167192

168193
advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
169194
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
195+
p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l}
170196

171197
logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
172198

0 commit comments

Comments
 (0)