Skip to content

Commit 02202d1

Browse files
committed
prepare for rework
1 parent b8646e4 commit 02202d1

File tree

6 files changed

+119
-77
lines changed

6 files changed

+119
-77
lines changed

cmd/kafka-proxy/server.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ func initFlags() {
8989
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
9090
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
9191
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
92-
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
9392
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
9493
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
9594

config/config.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@ type ListenerConfig struct {
2929
ListenerAddress string
3030
AdvertisedAddress string
3131
}
32-
type IdListenerConfig struct {
33-
BrokerAddress string
34-
Listener net.Listener
35-
}
32+
3633
type DialAddressMapping struct {
3734
SourceAddress string
3835
DestinationAddress string
@@ -78,7 +75,6 @@ type Config struct {
7875
DefaultListenerIP string
7976
BootstrapServers []ListenerConfig
8077
ExternalServers []ListenerConfig
81-
DeterministicListeners bool
8278
DialAddressMappings []DialAddressMapping
8379
DisableDynamicListeners bool
8480
DynamicAdvertisedListener string

proxy/protocol/responses.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var (
2727

2828
func createMetadataResponseSchemaVersions() []Schema {
2929
metadataBrokerV0 := NewSchema("metadata_broker_v0",
30-
&Mfield{Name: "node_id", Ty: TypeInt32},
30+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
3131
&Mfield{Name: hostKeyName, Ty: TypeStr},
3232
&Mfield{Name: portKeyName, Ty: TypeInt32},
3333
)
@@ -52,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema {
5252
)
5353

5454
metadataBrokerV1 := NewSchema("metadata_broker_v1",
55-
&Mfield{Name: "node_id", Ty: TypeInt32},
55+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
5656
&Mfield{Name: hostKeyName, Ty: TypeStr},
5757
&Mfield{Name: portKeyName, Ty: TypeInt32},
5858
&Mfield{Name: "rack", Ty: TypeNullableStr},
5959
)
6060

6161
metadataBrokerSchema9 := NewSchema("metadata_broker_schema9",
62-
&Mfield{Name: "node_id", Ty: TypeInt32},
62+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
6363
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
6464
&Mfield{Name: portKeyName, Ty: TypeInt32},
6565
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
@@ -249,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema {
249249

250250
func createFindCoordinatorResponseSchemaVersions() []Schema {
251251
findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0",
252-
&Mfield{Name: "node_id", Ty: TypeInt32},
252+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
253253
&Mfield{Name: hostKeyName, Ty: TypeStr},
254254
&Mfield{Name: portKeyName, Ty: TypeInt32},
255255
)
256256

257257
findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9",
258-
&Mfield{Name: "node_id", Ty: TypeInt32},
258+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
259259
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
260260
&Mfield{Name: portKeyName, Ty: TypeInt32},
261261
)
@@ -341,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
341341
}
342342
}
343343
if port != newPort {
344-
err = broker.Replace(portKeyName, int32(newPort))
344+
err = broker.Replace(portKeyName, newPort)
345345
if err != nil {
346346
return err
347347
}

proxy/proxy.go

Lines changed: 44 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/sirupsen/logrus"
1212
)
1313

14-
type ListenFunc func(cfg config.ListenerConfig) (l net.Listener, err error)
14+
type ListenFunc func(cfg *ListenerConfig) (l net.Listener, err error)
1515

1616
type Listeners struct {
1717
// Source of new connections to Kafka broker.
@@ -25,13 +25,11 @@ type Listeners struct {
2525

2626
listenFunc ListenFunc
2727

28-
deterministicListeners bool
2928
disableDynamicListeners bool
3029
dynamicSequentialMinPort int
3130

32-
brokerToListenerConfig map[string]config.ListenerConfig
33-
brokerIdToIdListenerConfig map[int32]config.IdListenerConfig
34-
lock sync.RWMutex
31+
brokerToListenerConfig map[string]*ListenerConfig
32+
lock sync.RWMutex
3533
}
3634

3735
func NewListeners(cfg *config.Config) (*Listeners, error) {
@@ -54,7 +52,7 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
5452
}
5553
}
5654

57-
listenFunc := func(cfg config.ListenerConfig) (net.Listener, error) {
55+
listenFunc := func(cfg *ListenerConfig) (net.Listener, error) {
5856
if tlsConfig != nil {
5957
return tls.Listen("tcp", cfg.ListenerAddress, tlsConfig)
6058
}
@@ -66,34 +64,30 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
6664
return nil, err
6765
}
6866

69-
brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig)
70-
7167
return &Listeners{
72-
defaultListenerIP: defaultListenerIP,
73-
dynamicAdvertisedListener: dynamicAdvertisedListener,
74-
connSrc: make(chan Conn, 1),
75-
brokerToListenerConfig: brokerToListenerConfig,
76-
brokerIdToIdListenerConfig: brokerIdToIdListenerConfig,
77-
tcpConnOptions: tcpConnOptions,
78-
listenFunc: listenFunc,
79-
deterministicListeners: cfg.Proxy.DeterministicListeners,
80-
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
81-
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
68+
defaultListenerIP: defaultListenerIP,
69+
dynamicAdvertisedListener: dynamicAdvertisedListener,
70+
connSrc: make(chan Conn, 1),
71+
brokerToListenerConfig: brokerToListenerConfig,
72+
tcpConnOptions: tcpConnOptions,
73+
listenFunc: listenFunc,
74+
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
75+
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
8276
}, nil
8377
}
8478

85-
func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerConfig, error) {
86-
brokerToListenerConfig := make(map[string]config.ListenerConfig)
79+
func getBrokerToListenerConfig(cfg *config.Config) (map[string]*ListenerConfig, error) {
80+
brokerToListenerConfig := make(map[string]*ListenerConfig)
8781

8882
for _, v := range cfg.Proxy.BootstrapServers {
8983
if lc, ok := brokerToListenerConfig[v.BrokerAddress]; ok {
9084
if lc.ListenerAddress != v.ListenerAddress || lc.AdvertisedAddress != v.AdvertisedAddress {
91-
return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc)
85+
return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc.ToListenerConfig())
9286
}
9387
continue
9488
}
9589
logrus.Infof("Bootstrap server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
96-
brokerToListenerConfig[v.BrokerAddress] = v
90+
brokerToListenerConfig[v.BrokerAddress] = FromListenerConfig(v)
9791
}
9892

9993
externalToListenerConfig := make(map[string]config.ListenerConfig)
@@ -118,7 +112,7 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo
118112
continue
119113
}
120114
logrus.Infof("External server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
121-
brokerToListenerConfig[v.BrokerAddress] = v
115+
brokerToListenerConfig[v.BrokerAddress] = FromListenerConfig(v)
122116
}
123117
return brokerToListenerConfig, nil
124118
}
@@ -132,26 +126,13 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, br
132126

133127
p.lock.RLock()
134128
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
135-
idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId]
136129
p.lock.RUnlock()
137130

138131
if ok {
139-
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
132+
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s, brokerId=%d", listenerConfig.GetBrokerAddress(), listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress, brokerId)
140133
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
141134
}
142135
if !p.disableDynamicListeners {
143-
if brokerIdFound {
144-
logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId)
145-
// Existing broker ID found, but with a different upstream broker
146-
// Close existing listener, remove two mappings:
147-
// * ID to removed upstream broker
148-
// * removed upstream broker
149-
idListenerConfig.Listener.Close()
150-
p.lock.Lock()
151-
delete(p.brokerIdToIdListenerConfig, brokerId)
152-
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
153-
p.lock.Unlock()
154-
}
155136
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
156137
return p.ListenDynamicInstance(brokerAddress, brokerId)
157138
}
@@ -166,18 +147,13 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
166147
return util.SplitHostPort(v.AdvertisedAddress)
167148
}
168149

169-
var defaultListenerAddress string
170-
171-
if p.deterministicListeners {
172-
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId)))
173-
} else {
174-
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
175-
if p.dynamicSequentialMinPort != 0 {
176-
p.dynamicSequentialMinPort += 1
177-
}
150+
defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
151+
if p.dynamicSequentialMinPort != 0 {
152+
p.dynamicSequentialMinPort += 1
178153
}
154+
//TODO: brokerId >= 0 for deternmisitc listener dynamicSequentialMinPort + brokerId < 65535 (overflow)
179155

180-
cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
156+
cfg := NewListenerConfig(brokerAddress, defaultListenerAddress, "", brokerId)
181157
l, err := listenInstance(p.connSrc, cfg, p.tcpConnOptions, p.listenFunc)
182158
if err != nil {
183159
return "", 0, err
@@ -189,12 +165,11 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
189165
if dynamicAdvertisedListener == "" {
190166
dynamicAdvertisedListener = p.defaultListenerIP
191167
}
168+
cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
169+
cfg.ListenerAddress = address
192170

193-
advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
194-
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
195-
p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l}
196-
197-
logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
171+
p.brokerToListenerConfig[brokerAddress] = cfg
172+
logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), cfg.BrokerID, cfg.AdvertisedAddress)
198173

199174
return dynamicAdvertisedListener, int32(port), nil
200175
}
@@ -205,15 +180,16 @@ func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn,
205180

206181
// allows multiple local addresses to point to the remote
207182
for _, v := range cfgs {
208-
_, err := listenInstance(p.connSrc, v, p.tcpConnOptions, p.listenFunc)
183+
cfg := FromListenerConfig(v)
184+
_, err := listenInstance(p.connSrc, cfg, p.tcpConnOptions, p.listenFunc)
209185
if err != nil {
210186
return nil, err
211187
}
212188
}
213189
return p.connSrc, nil
214190
}
215191

216-
func listenInstance(dst chan<- Conn, cfg config.ListenerConfig, opts TCPConnOptions, listenFunc ListenFunc) (net.Listener, error) {
192+
func listenInstance(dst chan<- Conn, cfg *ListenerConfig, opts TCPConnOptions, listenFunc ListenFunc) (net.Listener, error) {
217193
l, err := listenFunc(cfg)
218194
if err != nil {
219195
return nil, err
@@ -222,20 +198,28 @@ func listenInstance(dst chan<- Conn, cfg config.ListenerConfig, opts TCPConnOpti
222198
for {
223199
c, err := l.Accept()
224200
if err != nil {
225-
logrus.Infof("Error in accept for %q on %v: %v", cfg, cfg.ListenerAddress, err)
201+
logrus.Infof("Error in accept for %q on %v: %v", cfg.ToListenerConfig(), cfg.ListenerAddress, err)
226202
l.Close()
227203
return
228204
}
229205
if tcpConn, ok := c.(*net.TCPConn); ok {
230206
if err := opts.setTCPConnOptions(tcpConn); err != nil {
231-
logrus.Infof("WARNING: Error while setting TCP options for accepted connection %q on %v: %v", cfg, l.Addr().String(), err)
207+
logrus.Infof("WARNING: Error while setting TCP options for accepted connection %q on %v: %v", cfg.ToListenerConfig(), l.Addr().String(), err)
232208
}
233209
}
234-
logrus.Infof("New connection for %s", cfg.BrokerAddress)
235-
dst <- Conn{BrokerAddress: cfg.BrokerAddress, LocalConnection: c}
210+
brokerAddress := cfg.GetBrokerAddress()
211+
if cfg.BrokerID != UnknownBrokerID {
212+
logrus.Infof("New connection for %s brokerId %d", brokerAddress, cfg.BrokerID)
213+
} else {
214+
logrus.Infof("New connection for %s", brokerAddress)
215+
}
216+
dst <- Conn{BrokerAddress: brokerAddress, LocalConnection: c}
236217
}
237218
})
238-
239-
logrus.Infof("Listening on %s (%s) for remote %s", cfg.ListenerAddress, l.Addr().String(), cfg.BrokerAddress)
219+
if cfg.BrokerID != UnknownBrokerID {
220+
logrus.Infof("Listening on %s (%s) for remote %s broker %d", cfg.ListenerAddress, l.Addr().String(), cfg.GetBrokerAddress(), cfg.BrokerID)
221+
} else {
222+
logrus.Infof("Listening on %s (%s) for remote %s", cfg.ListenerAddress, l.Addr().String(), cfg.GetBrokerAddress())
223+
}
240224
return l, nil
241225
}

proxy/proxy_config.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package proxy
2+
3+
import (
4+
"github.com/grepplabs/kafka-proxy/config"
5+
"sync/atomic"
6+
)
7+
8+
const UnknownBrokerID = -1
9+
10+
type ListenerConfig struct {
11+
BrokerAddressPtr atomic.Pointer[string]
12+
ListenerAddress string
13+
AdvertisedAddress string
14+
BrokerID int32
15+
}
16+
17+
func FromListenerConfig(listenerConfig config.ListenerConfig) *ListenerConfig {
18+
c := &ListenerConfig{
19+
ListenerAddress: listenerConfig.ListenerAddress,
20+
AdvertisedAddress: listenerConfig.AdvertisedAddress,
21+
BrokerID: UnknownBrokerID,
22+
}
23+
c.BrokerAddressPtr.Store(&listenerConfig.BrokerAddress)
24+
return c
25+
}
26+
27+
func NewListenerConfig(brokerAddress, listenerAddress, advertisedAddress string, brokerID int32) *ListenerConfig {
28+
c := &ListenerConfig{
29+
ListenerAddress: listenerAddress,
30+
AdvertisedAddress: advertisedAddress,
31+
BrokerID: brokerID,
32+
}
33+
c.BrokerAddressPtr.Store(&brokerAddress)
34+
return c
35+
}
36+
func (c *ListenerConfig) ToListenerConfig() config.ListenerConfig {
37+
return config.ListenerConfig{
38+
BrokerAddress: c.GetBrokerAddress(),
39+
ListenerAddress: c.ListenerAddress,
40+
AdvertisedAddress: c.AdvertisedAddress,
41+
}
42+
}
43+
44+
func (c *ListenerConfig) GetBrokerAddress() string {
45+
addressPtr := c.BrokerAddressPtr.Load()
46+
if addressPtr == nil {
47+
return ""
48+
}
49+
return *addressPtr
50+
}
51+
52+
func (c *ListenerConfig) SetBrokerAddress(address string) {
53+
c.BrokerAddressPtr.Store(&address)
54+
}

proxy/proxy_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ func TestGetBrokerToListenerConfig(t *testing.T) {
162162
{
163163
BrokerAddress: "192.168.99.100:32404",
164164
ListenerAddress: "kafka-proxy-0:32404",
165-
AdvertisedAddress: "kafka-proxy-0:32404"},
165+
AdvertisedAddress: "kafka-proxy-0:32404",
166+
},
166167
},
167168
nil,
168169
map[string]config.ListenerConfig{
@@ -198,8 +199,7 @@ func TestGetBrokerToListenerConfig(t *testing.T) {
198199
{
199200
BrokerAddress: "192.168.99.100:32400",
200201
ListenerAddress: "0.0.0.0:32400",
201-
AdvertisedAddress: "kafka-proxy-0:32400",
202-
},
202+
AdvertisedAddress: "kafka-proxy-0:32400"},
203203
},
204204
[]config.ListenerConfig{
205205
{
@@ -269,8 +269,17 @@ func TestGetBrokerToListenerConfig(t *testing.T) {
269269
c := &config.Config{}
270270
c.Proxy.BootstrapServers = tt.bootstrapServers
271271
c.Proxy.ExternalServers = tt.externalServers
272-
mapping, err := getBrokerToListenerConfig(c)
272+
brokerToListenerConfig, err := getBrokerToListenerConfig(c)
273273
a.Equal(tt.err, err)
274-
a.Equal(tt.mapping, mapping)
274+
275+
mapping := make(map[string]config.ListenerConfig)
276+
for k, v := range brokerToListenerConfig {
277+
mapping[k] = config.ListenerConfig{
278+
BrokerAddress: v.GetBrokerAddress(),
279+
ListenerAddress: v.ListenerAddress,
280+
AdvertisedAddress: v.AdvertisedAddress,
281+
}
282+
}
283+
assert.ObjectsAreEqual(tt.mapping, mapping)
275284
}
276285
}

0 commit comments

Comments
 (0)