Skip to content

Commit b930b9d

Browse files
committed
Deterministic listeners WIP
1 parent 44b9dcb commit b930b9d

File tree

7 files changed

+166
-42
lines changed

7 files changed

+166
-42
lines changed

cmd/kafka-proxy/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func initFlags() {
8989
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
9090
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
9191
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
92+
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
9293
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
9394
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
9495

config/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ var (
2222
Version = "unknown"
2323
)
2424

25-
type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error)
25+
type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error)
2626

2727
type ListenerConfig struct {
2828
BrokerAddress string
2929
ListenerAddress string
3030
AdvertisedAddress string
31+
Listener net.Listener
3132
}
3233
type DialAddressMapping struct {
3334
SourceAddress string
@@ -74,6 +75,7 @@ type Config struct {
7475
DefaultListenerIP string
7576
BootstrapServers []ListenerConfig
7677
ExternalServers []ListenerConfig
78+
DeterministicListeners bool
7779
DialAddressMappings []DialAddressMapping
7880
DisableDynamicListeners bool
7981
DynamicAdvertisedListener string

proxy/processor_default_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package proxy
33
import (
44
"bytes"
55
"encoding/hex"
6+
"testing"
7+
"time"
8+
69
"github.com/grepplabs/kafka-proxy/proxy/protocol"
710
"github.com/pkg/errors"
811
"github.com/stretchr/testify/assert"
9-
"testing"
10-
"time"
1112
)
1213

1314
func TestHandleRequest(t *testing.T) {
@@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) {
130131
}
131132

132133
func TestHandleResponse(t *testing.T) {
133-
netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
134+
netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
134135
if brokerHost == "localhost" {
135136
switch brokerPort {
136137
case 19092:

proxy/protocol/responses.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const (
1414
brokersKeyName = "brokers"
1515
hostKeyName = "host"
1616
portKeyName = "port"
17+
nodeKeyName = "node_id"
1718

1819
coordinatorKeyName = "coordinator"
1920
coordinatorsKeyName = "coordinators"
@@ -320,12 +321,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
320321
if !ok {
321322
return errors.New("broker.port not found")
322323
}
324+
nodeId, ok := broker.Get(nodeKeyName).(int32)
325+
if !ok {
326+
return errors.New("broker.node_id not found")
327+
}
323328

324329
if host == "" && port <= 0 {
325330
continue
326331
}
327332

328-
newHost, newPort, err := fn(host, port)
333+
newHost, newPort, err := fn(host, port, nodeId)
329334
if err != nil {
330335
return err
331336
}
@@ -383,12 +388,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e
383388
if !ok {
384389
return errors.New("coordinator.port not found")
385390
}
391+
nodeId, ok := coordinator.Get(nodeKeyName).(int32)
392+
if !ok {
393+
return errors.New("coordinator.node_id not found")
394+
}
386395

387396
if host == "" && port <= 0 {
388397
return nil
389398
}
390399

391-
newHost, newPort, err := fn(host, port)
400+
newHost, newPort, err := fn(host, port, nodeId)
392401
if err != nil {
393402
return err
394403
}

proxy/protocol/responses_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package protocol
33
import (
44
"encoding/hex"
55
"fmt"
6-
"github.com/google/uuid"
76
"reflect"
87
"strings"
98
"testing"
109

10+
"github.com/google/uuid"
11+
1112
"github.com/grepplabs/kafka-proxy/config"
1213
"github.com/pkg/errors"
1314
"github.com/stretchr/testify/assert"
@@ -20,7 +21,7 @@ var (
2021
// topic_metadata
2122
0x00, 0x00, 0x00, 0x00}
2223

23-
testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
24+
testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
2425
if brokerHost == "localhost" && brokerPort == 51 {
2526
return "myhost1", 34001, nil
2627
} else if brokerHost == "google.com" && brokerPort == 273 {
@@ -31,7 +32,7 @@ var (
3132
return "", 0, errors.New("unexpected data")
3233
}
3334

34-
testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
35+
testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
3536
if brokerHost == "localhost" && brokerPort == 19092 {
3637
return "myhost1", 34001, nil
3738
} else if brokerHost == "localhost" && brokerPort == 29092 {
@@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) {
374375
a.Nil(err)
375376
a.Equal(bytes, resp)
376377

377-
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
378+
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
378379
if brokerHost == "localhost" && brokerPort == 51 {
379380
return "azure.microsoft.com", 34001, nil
380381
} else if brokerHost == "google.com" && brokerPort == 273 {

proxy/proxy.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ type Listeners struct {
2525

2626
listenFunc ListenFunc
2727

28+
deterministicListeners bool
2829
disableDynamicListeners bool
2930
dynamicSequentialMinPort int
3031

31-
brokerToListenerConfig map[string]config.ListenerConfig
32-
lock sync.RWMutex
32+
brokerToListenerConfig map[string]config.ListenerConfig
33+
brokerIdToListenerConfig map[int32]config.ListenerConfig
34+
lock sync.RWMutex
3335
}
3436

3537
func NewListeners(cfg *config.Config) (*Listeners, error) {
@@ -64,13 +66,17 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
6466
return nil, err
6567
}
6668

69+
brokerIdToListenerConfig := make(map[int32]config.ListenerConfig)
70+
6771
return &Listeners{
6872
defaultListenerIP: defaultListenerIP,
6973
dynamicAdvertisedListener: dynamicAdvertisedListener,
7074
connSrc: make(chan Conn, 1),
7175
brokerToListenerConfig: brokerToListenerConfig,
76+
brokerIdToListenerConfig: brokerIdToListenerConfig,
7277
tcpConnOptions: tcpConnOptions,
7378
listenFunc: listenFunc,
79+
deterministicListeners: cfg.Proxy.DeterministicListeners,
7480
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
7581
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
7682
}, nil
@@ -117,7 +123,7 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo
117123
return brokerToListenerConfig, nil
118124
}
119125

120-
func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
126+
func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
121127
if brokerHost == "" || brokerPort <= 0 {
122128
return "", 0, fmt.Errorf("broker address '%s:%d' is invalid", brokerHost, brokerPort)
123129
}
@@ -126,30 +132,46 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (l
126132

127133
p.lock.RLock()
128134
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
135+
idListenerConfig, brokerIdFound := p.brokerIdToListenerConfig[brokerId]
129136
p.lock.RUnlock()
130137

131138
if ok {
132139
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
133140
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
134141
}
135142
if !p.disableDynamicListeners {
143+
if brokerIdFound {
144+
// Existing broker ID found, but with a different upstream broker
145+
// Close existing listener, remove two mappings:
146+
// * ID to removed upstream broker
147+
// * removed upstream broker
148+
idListenerConfig.Listener.Close()
149+
delete(p.brokerIdToListenerConfig, brokerId)
150+
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
151+
}
136152
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
137-
return p.ListenDynamicInstance(brokerAddress)
153+
return p.ListenDynamicInstance(brokerAddress, brokerId)
138154
}
139155
return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort)
140156
}
141157

142-
func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, error) {
158+
func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) {
143159
p.lock.Lock()
144160
defer p.lock.Unlock()
145161
// double check
146162
if v, ok := p.brokerToListenerConfig[brokerAddress]; ok {
147163
return util.SplitHostPort(v.AdvertisedAddress)
148164
}
149165

150-
defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
151-
if p.dynamicSequentialMinPort != 0 {
152-
p.dynamicSequentialMinPort += 1
166+
var defaultListenerAddress string
167+
168+
if p.deterministicListeners {
169+
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId)))
170+
} else {
171+
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
172+
if p.dynamicSequentialMinPort != 0 {
173+
p.dynamicSequentialMinPort += 1
174+
}
153175
}
154176

155177
cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
@@ -166,7 +188,8 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32,
166188
}
167189

168190
advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
169-
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
191+
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress, Listener: l}
192+
p.brokerIdToListenerConfig[brokerId] = p.brokerToListenerConfig[brokerAddress]
170193

171194
logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
172195

0 commit comments

Comments
 (0)