Skip to content

Commit 93770ec

Browse files
committed
prepare for rework
1 parent b8646e4 commit 93770ec

File tree

5 files changed

+147
-74
lines changed

5 files changed

+147
-74
lines changed

config/config.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@ type ListenerConfig struct {
2929
ListenerAddress string
3030
AdvertisedAddress string
3131
}
32-
type IdListenerConfig struct {
33-
BrokerAddress string
34-
Listener net.Listener
35-
}
32+
3633
type DialAddressMapping struct {
3734
SourceAddress string
3835
DestinationAddress string

proxy/protocol/responses.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var (
2727

2828
func createMetadataResponseSchemaVersions() []Schema {
2929
metadataBrokerV0 := NewSchema("metadata_broker_v0",
30-
&Mfield{Name: "node_id", Ty: TypeInt32},
30+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
3131
&Mfield{Name: hostKeyName, Ty: TypeStr},
3232
&Mfield{Name: portKeyName, Ty: TypeInt32},
3333
)
@@ -52,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema {
5252
)
5353

5454
metadataBrokerV1 := NewSchema("metadata_broker_v1",
55-
&Mfield{Name: "node_id", Ty: TypeInt32},
55+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
5656
&Mfield{Name: hostKeyName, Ty: TypeStr},
5757
&Mfield{Name: portKeyName, Ty: TypeInt32},
5858
&Mfield{Name: "rack", Ty: TypeNullableStr},
5959
)
6060

6161
metadataBrokerSchema9 := NewSchema("metadata_broker_schema9",
62-
&Mfield{Name: "node_id", Ty: TypeInt32},
62+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
6363
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
6464
&Mfield{Name: portKeyName, Ty: TypeInt32},
6565
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
@@ -249,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema {
249249

250250
func createFindCoordinatorResponseSchemaVersions() []Schema {
251251
findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0",
252-
&Mfield{Name: "node_id", Ty: TypeInt32},
252+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
253253
&Mfield{Name: hostKeyName, Ty: TypeStr},
254254
&Mfield{Name: portKeyName, Ty: TypeInt32},
255255
)
256256

257257
findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9",
258-
&Mfield{Name: "node_id", Ty: TypeInt32},
258+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
259259
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
260260
&Mfield{Name: portKeyName, Ty: TypeInt32},
261261
)
@@ -341,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
341341
}
342342
}
343343
if port != newPort {
344-
err = broker.Replace(portKeyName, int32(newPort))
344+
err = broker.Replace(portKeyName, newPort)
345345
if err != nil {
346346
return err
347347
}

proxy/proxy.go

Lines changed: 72 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import (
44
"crypto/tls"
55
"fmt"
66
"net"
7+
"strconv"
78
"sync"
89

910
"github.com/grepplabs/kafka-proxy/config"
1011
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
1112
"github.com/sirupsen/logrus"
1213
)
1314

14-
type ListenFunc func(cfg config.ListenerConfig) (l net.Listener, err error)
15+
type ListenFunc func(cfg *ListenerConfig) (l net.Listener, err error)
1516

1617
type Listeners struct {
1718
// Source of new connections to Kafka broker.
@@ -29,16 +30,11 @@ type Listeners struct {
2930
disableDynamicListeners bool
3031
dynamicSequentialMinPort int
3132

32-
brokerToListenerConfig map[string]config.ListenerConfig
33-
brokerIdToIdListenerConfig map[int32]config.IdListenerConfig
34-
lock sync.RWMutex
33+
brokerToListenerConfig map[string]*ListenerConfig
34+
lock sync.RWMutex
3535
}
3636

3737
func NewListeners(cfg *config.Config) (*Listeners, error) {
38-
39-
defaultListenerIP := cfg.Proxy.DefaultListenerIP
40-
dynamicAdvertisedListener := cfg.Proxy.DynamicAdvertisedListener
41-
4238
tcpConnOptions := TCPConnOptions{
4339
KeepAlive: cfg.Proxy.ListenerKeepAlive,
4440
ReadBufferSize: cfg.Proxy.ListenerReadBufferSize,
@@ -54,7 +50,7 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
5450
}
5551
}
5652

57-
listenFunc := func(cfg config.ListenerConfig) (net.Listener, error) {
53+
listenFunc := func(cfg *ListenerConfig) (net.Listener, error) {
5854
if tlsConfig != nil {
5955
return tls.Listen("tcp", cfg.ListenerAddress, tlsConfig)
6056
}
@@ -66,34 +62,31 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
6662
return nil, err
6763
}
6864

69-
brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig)
70-
7165
return &Listeners{
72-
defaultListenerIP: defaultListenerIP,
73-
dynamicAdvertisedListener: dynamicAdvertisedListener,
74-
connSrc: make(chan Conn, 1),
75-
brokerToListenerConfig: brokerToListenerConfig,
76-
brokerIdToIdListenerConfig: brokerIdToIdListenerConfig,
77-
tcpConnOptions: tcpConnOptions,
78-
listenFunc: listenFunc,
79-
deterministicListeners: cfg.Proxy.DeterministicListeners,
80-
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
81-
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
66+
defaultListenerIP: cfg.Proxy.DefaultListenerIP,
67+
dynamicAdvertisedListener: cfg.Proxy.DynamicAdvertisedListener,
68+
connSrc: make(chan Conn, 1),
69+
brokerToListenerConfig: brokerToListenerConfig,
70+
tcpConnOptions: tcpConnOptions,
71+
listenFunc: listenFunc,
72+
deterministicListeners: cfg.Proxy.DeterministicListeners,
73+
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
74+
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
8275
}, nil
8376
}
8477

85-
func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerConfig, error) {
86-
brokerToListenerConfig := make(map[string]config.ListenerConfig)
78+
func getBrokerToListenerConfig(cfg *config.Config) (map[string]*ListenerConfig, error) {
79+
brokerToListenerConfig := make(map[string]*ListenerConfig)
8780

8881
for _, v := range cfg.Proxy.BootstrapServers {
8982
if lc, ok := brokerToListenerConfig[v.BrokerAddress]; ok {
9083
if lc.ListenerAddress != v.ListenerAddress || lc.AdvertisedAddress != v.AdvertisedAddress {
91-
return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc)
84+
return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc.ToListenerConfig())
9285
}
9386
continue
9487
}
9588
logrus.Infof("Bootstrap server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
96-
brokerToListenerConfig[v.BrokerAddress] = v
89+
brokerToListenerConfig[v.BrokerAddress] = FromListenerConfig(v)
9790
}
9891

9992
externalToListenerConfig := make(map[string]config.ListenerConfig)
@@ -118,7 +111,7 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo
118111
continue
119112
}
120113
logrus.Infof("External server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
121-
brokerToListenerConfig[v.BrokerAddress] = v
114+
brokerToListenerConfig[v.BrokerAddress] = FromListenerConfig(v)
122115
}
123116
return brokerToListenerConfig, nil
124117
}
@@ -132,32 +125,28 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, br
132125

133126
p.lock.RLock()
134127
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
135-
idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId]
136128
p.lock.RUnlock()
137129

138130
if ok {
139-
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
131+
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s, brokerId=%d", listenerConfig.GetBrokerAddress(), listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress, brokerId)
140132
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
141133
}
142134
if !p.disableDynamicListeners {
143-
if brokerIdFound {
144-
logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId)
145-
// Existing broker ID found, but with a different upstream broker
146-
// Close existing listener, remove two mappings:
147-
// * ID to removed upstream broker
148-
// * removed upstream broker
149-
idListenerConfig.Listener.Close()
150-
p.lock.Lock()
151-
delete(p.brokerIdToIdListenerConfig, brokerId)
152-
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
153-
p.lock.Unlock()
154-
}
155135
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
156136
return p.ListenDynamicInstance(brokerAddress, brokerId)
157137
}
158138
return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort)
159139
}
160140

141+
func (p *Listeners) findListenerConfig(brokerId int32) *ListenerConfig {
142+
for _, listenerConfig := range p.brokerToListenerConfig {
143+
if listenerConfig.BrokerID == brokerId {
144+
return listenerConfig
145+
}
146+
}
147+
return nil
148+
}
149+
161150
func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) {
162151
p.lock.Lock()
163152
defer p.lock.Unlock()
@@ -166,18 +155,34 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
166155
return util.SplitHostPort(v.AdvertisedAddress)
167156
}
168157

169-
var defaultListenerAddress string
170-
158+
var listenerAddress string
171159
if p.deterministicListeners {
172-
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId)))
160+
if brokerId < 0 {
161+
return "", 0, fmt.Errorf("brokerId is negative %s %d", brokerAddress, brokerId)
162+
}
163+
deterministicPort := p.dynamicSequentialMinPort + int(brokerId)
164+
if deterministicPort < p.dynamicSequentialMinPort {
165+
return "", 0, fmt.Errorf("port assignment overflow %s %d: %d", brokerAddress, brokerId, deterministicPort)
166+
}
167+
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(deterministicPort))
168+
cfg := p.findListenerConfig(brokerId)
169+
if cfg != nil {
170+
oldBrokerAddress := cfg.GetBrokerAddress()
171+
if oldBrokerAddress != brokerAddress {
172+
delete(p.brokerToListenerConfig, oldBrokerAddress)
173+
cfg.SetBrokerAddress(brokerAddress)
174+
p.brokerToListenerConfig[brokerAddress] = cfg
175+
logrus.Infof("Broker address changed listener %s for new address %s old address %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), oldBrokerAddress, cfg.BrokerID, cfg.AdvertisedAddress)
176+
}
177+
return util.SplitHostPort(cfg.AdvertisedAddress)
178+
}
173179
} else {
174-
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
180+
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(p.dynamicSequentialMinPort))
175181
if p.dynamicSequentialMinPort != 0 {
176182
p.dynamicSequentialMinPort += 1
177183
}
178184
}
179-
180-
cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
185+
cfg := NewListenerConfig(brokerAddress, listenerAddress, "", brokerId)
181186
l, err := listenInstance(p.connSrc, cfg, p.tcpConnOptions, p.listenFunc)
182187
if err != nil {
183188
return "", 0, err
@@ -189,12 +194,11 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
189194
if dynamicAdvertisedListener == "" {
190195
dynamicAdvertisedListener = p.defaultListenerIP
191196
}
197+
cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
198+
cfg.ListenerAddress = address
192199

193-
advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
194-
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
195-
p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l}
196-
197-
logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
200+
p.brokerToListenerConfig[brokerAddress] = cfg
201+
logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), cfg.BrokerID, cfg.AdvertisedAddress)
198202

199203
return dynamicAdvertisedListener, int32(port), nil
200204
}
@@ -205,15 +209,16 @@ func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn,
205209

206210
// allows multiple local addresses to point to the remote
207211
for _, v := range cfgs {
208-
_, err := listenInstance(p.connSrc, v, p.tcpConnOptions, p.listenFunc)
212+
cfg := FromListenerConfig(v)
213+
_, err := listenInstance(p.connSrc, cfg, p.tcpConnOptions, p.listenFunc)
209214
if err != nil {
210215
return nil, err
211216
}
212217
}
213218
return p.connSrc, nil
214219
}
215220

216-
func listenInstance(dst chan<- Conn, cfg config.ListenerConfig, opts TCPConnOptions, listenFunc ListenFunc) (net.Listener, error) {
221+
func listenInstance(dst chan<- Conn, cfg *ListenerConfig, opts TCPConnOptions, listenFunc ListenFunc) (net.Listener, error) {
217222
l, err := listenFunc(cfg)
218223
if err != nil {
219224
return nil, err
@@ -222,20 +227,28 @@ func listenInstance(dst chan<- Conn, cfg config.ListenerConfig, opts TCPConnOpti
222227
for {
223228
c, err := l.Accept()
224229
if err != nil {
225-
logrus.Infof("Error in accept for %q on %v: %v", cfg, cfg.ListenerAddress, err)
230+
logrus.Infof("Error in accept for %q on %v: %v", cfg.ToListenerConfig(), cfg.ListenerAddress, err)
226231
l.Close()
227232
return
228233
}
229234
if tcpConn, ok := c.(*net.TCPConn); ok {
230235
if err := opts.setTCPConnOptions(tcpConn); err != nil {
231-
logrus.Infof("WARNING: Error while setting TCP options for accepted connection %q on %v: %v", cfg, l.Addr().String(), err)
236+
logrus.Infof("WARNING: Error while setting TCP options for accepted connection %q on %v: %v", cfg.ToListenerConfig(), l.Addr().String(), err)
232237
}
233238
}
234-
logrus.Infof("New connection for %s", cfg.BrokerAddress)
235-
dst <- Conn{BrokerAddress: cfg.BrokerAddress, LocalConnection: c}
239+
brokerAddress := cfg.GetBrokerAddress()
240+
if cfg.BrokerID != UnknownBrokerID {
241+
logrus.Infof("New connection for %s brokerId %d", brokerAddress, cfg.BrokerID)
242+
} else {
243+
logrus.Infof("New connection for %s", brokerAddress)
244+
}
245+
dst <- Conn{BrokerAddress: brokerAddress, LocalConnection: c}
236246
}
237247
})
238-
239-
logrus.Infof("Listening on %s (%s) for remote %s", cfg.ListenerAddress, l.Addr().String(), cfg.BrokerAddress)
248+
if cfg.BrokerID != UnknownBrokerID {
249+
logrus.Infof("Listening on %s (%s) for remote %s broker %d", cfg.ListenerAddress, l.Addr().String(), cfg.GetBrokerAddress(), cfg.BrokerID)
250+
} else {
251+
logrus.Infof("Listening on %s (%s) for remote %s", cfg.ListenerAddress, l.Addr().String(), cfg.GetBrokerAddress())
252+
}
240253
return l, nil
241254
}

proxy/proxy_config.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package proxy
2+
3+
import (
4+
"github.com/grepplabs/kafka-proxy/config"
5+
"sync/atomic"
6+
)
7+
8+
const UnknownBrokerID = -1
9+
10+
type ListenerConfig struct {
11+
BrokerAddressPtr atomic.Pointer[string]
12+
ListenerAddress string
13+
AdvertisedAddress string
14+
BrokerID int32
15+
}
16+
17+
func FromListenerConfig(listenerConfig config.ListenerConfig) *ListenerConfig {
18+
c := &ListenerConfig{
19+
ListenerAddress: listenerConfig.ListenerAddress,
20+
AdvertisedAddress: listenerConfig.AdvertisedAddress,
21+
BrokerID: UnknownBrokerID,
22+
}
23+
c.BrokerAddressPtr.Store(&listenerConfig.BrokerAddress)
24+
return c
25+
}
26+
27+
func NewListenerConfig(brokerAddress, listenerAddress, advertisedAddress string, brokerID int32) *ListenerConfig {
28+
c := &ListenerConfig{
29+
ListenerAddress: listenerAddress,
30+
AdvertisedAddress: advertisedAddress,
31+
BrokerID: brokerID,
32+
}
33+
c.BrokerAddressPtr.Store(&brokerAddress)
34+
return c
35+
}
36+
func (c *ListenerConfig) ToListenerConfig() config.ListenerConfig {
37+
return config.ListenerConfig{
38+
BrokerAddress: c.GetBrokerAddress(),
39+
ListenerAddress: c.ListenerAddress,
40+
AdvertisedAddress: c.AdvertisedAddress,
41+
}
42+
}
43+
44+
func (c *ListenerConfig) GetBrokerAddress() string {
45+
addressPtr := c.BrokerAddressPtr.Load()
46+
if addressPtr == nil {
47+
return ""
48+
}
49+
return *addressPtr
50+
}
51+
52+
func (c *ListenerConfig) SetBrokerAddress(address string) {
53+
c.BrokerAddressPtr.Store(&address)
54+
}

0 commit comments

Comments
 (0)