Skip to content

Commit 054aa94

Browse files
committed
prepare for rework
1 parent b8646e4 commit 054aa94

File tree

5 files changed

+116
-99
lines changed

5 files changed

+116
-99
lines changed

cmd/kafka-proxy/server.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ func initFlags() {
8989
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
9090
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
9191
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
92-
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
9392
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
9493
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
9594

config/config.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@ type ListenerConfig struct {
2929
ListenerAddress string
3030
AdvertisedAddress string
3131
}
32-
type IdListenerConfig struct {
33-
BrokerAddress string
34-
Listener net.Listener
32+
33+
type EnhancedListenerConfig struct {
34+
ListenerConfig
35+
BrokerID int32
3536
}
37+
3638
type DialAddressMapping struct {
3739
SourceAddress string
3840
DestinationAddress string
@@ -78,7 +80,6 @@ type Config struct {
7880
DefaultListenerIP string
7981
BootstrapServers []ListenerConfig
8082
ExternalServers []ListenerConfig
81-
DeterministicListeners bool
8283
DialAddressMappings []DialAddressMapping
8384
DisableDynamicListeners bool
8485
DynamicAdvertisedListener string

proxy/protocol/responses.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var (
2727

2828
func createMetadataResponseSchemaVersions() []Schema {
2929
metadataBrokerV0 := NewSchema("metadata_broker_v0",
30-
&Mfield{Name: "node_id", Ty: TypeInt32},
30+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
3131
&Mfield{Name: hostKeyName, Ty: TypeStr},
3232
&Mfield{Name: portKeyName, Ty: TypeInt32},
3333
)
@@ -52,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema {
5252
)
5353

5454
metadataBrokerV1 := NewSchema("metadata_broker_v1",
55-
&Mfield{Name: "node_id", Ty: TypeInt32},
55+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
5656
&Mfield{Name: hostKeyName, Ty: TypeStr},
5757
&Mfield{Name: portKeyName, Ty: TypeInt32},
5858
&Mfield{Name: "rack", Ty: TypeNullableStr},
5959
)
6060

6161
metadataBrokerSchema9 := NewSchema("metadata_broker_schema9",
62-
&Mfield{Name: "node_id", Ty: TypeInt32},
62+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
6363
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
6464
&Mfield{Name: portKeyName, Ty: TypeInt32},
6565
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
@@ -249,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema {
249249

250250
func createFindCoordinatorResponseSchemaVersions() []Schema {
251251
findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0",
252-
&Mfield{Name: "node_id", Ty: TypeInt32},
252+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
253253
&Mfield{Name: hostKeyName, Ty: TypeStr},
254254
&Mfield{Name: portKeyName, Ty: TypeInt32},
255255
)
256256

257257
findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9",
258-
&Mfield{Name: "node_id", Ty: TypeInt32},
258+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
259259
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
260260
&Mfield{Name: portKeyName, Ty: TypeInt32},
261261
)
@@ -341,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
341341
}
342342
}
343343
if port != newPort {
344-
err = broker.Replace(portKeyName, int32(newPort))
344+
err = broker.Replace(portKeyName, newPort)
345345
if err != nil {
346346
return err
347347
}

proxy/proxy.go

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/sirupsen/logrus"
1212
)
1313

14+
const UnknownBrokerId = -1
15+
1416
type ListenFunc func(cfg config.ListenerConfig) (l net.Listener, err error)
1517

1618
type Listeners struct {
@@ -25,13 +27,11 @@ type Listeners struct {
2527

2628
listenFunc ListenFunc
2729

28-
deterministicListeners bool
2930
disableDynamicListeners bool
3031
dynamicSequentialMinPort int
3132

32-
brokerToListenerConfig map[string]config.ListenerConfig
33-
brokerIdToIdListenerConfig map[int32]config.IdListenerConfig
34-
lock sync.RWMutex
33+
brokerToListenerConfig map[string]config.EnhancedListenerConfig
34+
lock sync.RWMutex
3535
}
3636

3737
func NewListeners(cfg *config.Config) (*Listeners, error) {
@@ -66,34 +66,33 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
6666
return nil, err
6767
}
6868

69-
brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig)
70-
7169
return &Listeners{
72-
defaultListenerIP: defaultListenerIP,
73-
dynamicAdvertisedListener: dynamicAdvertisedListener,
74-
connSrc: make(chan Conn, 1),
75-
brokerToListenerConfig: brokerToListenerConfig,
76-
brokerIdToIdListenerConfig: brokerIdToIdListenerConfig,
77-
tcpConnOptions: tcpConnOptions,
78-
listenFunc: listenFunc,
79-
deterministicListeners: cfg.Proxy.DeterministicListeners,
80-
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
81-
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
70+
defaultListenerIP: defaultListenerIP,
71+
dynamicAdvertisedListener: dynamicAdvertisedListener,
72+
connSrc: make(chan Conn, 1),
73+
brokerToListenerConfig: brokerToListenerConfig,
74+
tcpConnOptions: tcpConnOptions,
75+
listenFunc: listenFunc,
76+
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
77+
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
8278
}, nil
8379
}
8480

85-
func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerConfig, error) {
86-
brokerToListenerConfig := make(map[string]config.ListenerConfig)
81+
func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.EnhancedListenerConfig, error) {
82+
brokerToListenerConfig := make(map[string]config.EnhancedListenerConfig)
8783

8884
for _, v := range cfg.Proxy.BootstrapServers {
8985
if lc, ok := brokerToListenerConfig[v.BrokerAddress]; ok {
9086
if lc.ListenerAddress != v.ListenerAddress || lc.AdvertisedAddress != v.AdvertisedAddress {
91-
return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc)
87+
return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc.ListenerConfig)
9288
}
9389
continue
9490
}
9591
logrus.Infof("Bootstrap server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
96-
brokerToListenerConfig[v.BrokerAddress] = v
92+
brokerToListenerConfig[v.BrokerAddress] = config.EnhancedListenerConfig{
93+
ListenerConfig: v,
94+
BrokerID: UnknownBrokerId,
95+
}
9796
}
9897

9998
externalToListenerConfig := make(map[string]config.ListenerConfig)
@@ -118,7 +117,10 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo
118117
continue
119118
}
120119
logrus.Infof("External server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
121-
brokerToListenerConfig[v.BrokerAddress] = v
120+
brokerToListenerConfig[v.BrokerAddress] = config.EnhancedListenerConfig{
121+
ListenerConfig: v,
122+
BrokerID: UnknownBrokerId,
123+
}
122124
}
123125
return brokerToListenerConfig, nil
124126
}
@@ -132,26 +134,13 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, br
132134

133135
p.lock.RLock()
134136
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
135-
idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId]
136137
p.lock.RUnlock()
137138

138139
if ok {
139-
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
140+
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s, brokerId=%d", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress, brokerId)
140141
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
141142
}
142143
if !p.disableDynamicListeners {
143-
if brokerIdFound {
144-
logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId)
145-
// Existing broker ID found, but with a different upstream broker
146-
// Close existing listener, remove two mappings:
147-
// * ID to removed upstream broker
148-
// * removed upstream broker
149-
idListenerConfig.Listener.Close()
150-
p.lock.Lock()
151-
delete(p.brokerIdToIdListenerConfig, brokerId)
152-
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
153-
p.lock.Unlock()
154-
}
155144
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
156145
return p.ListenDynamicInstance(brokerAddress, brokerId)
157146
}
@@ -166,15 +155,9 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
166155
return util.SplitHostPort(v.AdvertisedAddress)
167156
}
168157

169-
var defaultListenerAddress string
170-
171-
if p.deterministicListeners {
172-
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId)))
173-
} else {
174-
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
175-
if p.dynamicSequentialMinPort != 0 {
176-
p.dynamicSequentialMinPort += 1
177-
}
158+
defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
159+
if p.dynamicSequentialMinPort != 0 {
160+
p.dynamicSequentialMinPort += 1
178161
}
179162

180163
cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
@@ -191,10 +174,11 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
191174
}
192175

193176
advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
194-
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
195-
p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l}
196-
197-
logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
177+
p.brokerToListenerConfig[brokerAddress] = config.EnhancedListenerConfig{
178+
ListenerConfig: config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress},
179+
BrokerID: brokerId,
180+
}
181+
logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", address, brokerAddress, brokerId, advertisedAddress)
198182

199183
return dynamicAdvertisedListener, int32(port), nil
200184
}

0 commit comments

Comments
 (0)