Skip to content

Commit 9fb669f

Browse files
committed
Add lock on map update; change second map to use new type
1 parent b930b9d commit 9fb669f

File tree

2 files changed

+25
-19
lines changed

2 files changed

+25
-19
lines changed

config/config.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ type ListenerConfig struct {
2828
BrokerAddress string
2929
ListenerAddress string
3030
AdvertisedAddress string
31-
Listener net.Listener
31+
}
32+
type IdListenerConfig struct {
33+
BrokerAddress string
34+
Listener net.Listener
3235
}
3336
type DialAddressMapping struct {
3437
SourceAddress string

proxy/proxy.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ type Listeners struct {
2929
disableDynamicListeners bool
3030
dynamicSequentialMinPort int
3131

32-
brokerToListenerConfig map[string]config.ListenerConfig
33-
brokerIdToListenerConfig map[int32]config.ListenerConfig
34-
lock sync.RWMutex
32+
brokerToListenerConfig map[string]config.ListenerConfig
33+
brokerIdToIdListenerConfig map[int32]config.IdListenerConfig
34+
lock sync.RWMutex
3535
}
3636

3737
func NewListeners(cfg *config.Config) (*Listeners, error) {
@@ -66,19 +66,19 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
6666
return nil, err
6767
}
6868

69-
brokerIdToListenerConfig := make(map[int32]config.ListenerConfig)
69+
brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig)
7070

7171
return &Listeners{
72-
defaultListenerIP: defaultListenerIP,
73-
dynamicAdvertisedListener: dynamicAdvertisedListener,
74-
connSrc: make(chan Conn, 1),
75-
brokerToListenerConfig: brokerToListenerConfig,
76-
brokerIdToListenerConfig: brokerIdToListenerConfig,
77-
tcpConnOptions: tcpConnOptions,
78-
listenFunc: listenFunc,
79-
deterministicListeners: cfg.Proxy.DeterministicListeners,
80-
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
81-
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
72+
defaultListenerIP: defaultListenerIP,
73+
dynamicAdvertisedListener: dynamicAdvertisedListener,
74+
connSrc: make(chan Conn, 1),
75+
brokerToListenerConfig: brokerToListenerConfig,
76+
brokerIdToIdListenerConfig: brokerIdToIdListenerConfig,
77+
tcpConnOptions: tcpConnOptions,
78+
listenFunc: listenFunc,
79+
deterministicListeners: cfg.Proxy.DeterministicListeners,
80+
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
81+
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
8282
}, nil
8383
}
8484

@@ -132,7 +132,7 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, br
132132

133133
p.lock.RLock()
134134
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
135-
idListenerConfig, brokerIdFound := p.brokerIdToListenerConfig[brokerId]
135+
idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId]
136136
p.lock.RUnlock()
137137

138138
if ok {
@@ -141,13 +141,16 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, br
141141
}
142142
if !p.disableDynamicListeners {
143143
if brokerIdFound {
144+
logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId)
144145
// Existing broker ID found, but with a different upstream broker
145146
// Close existing listener, remove two mappings:
146147
// * ID to removed upstream broker
147148
// * removed upstream broker
148149
idListenerConfig.Listener.Close()
149-
delete(p.brokerIdToListenerConfig, brokerId)
150+
p.lock.Lock()
151+
delete(p.brokerIdToIdListenerConfig, brokerId)
150152
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
153+
p.lock.Unlock()
151154
}
152155
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
153156
return p.ListenDynamicInstance(brokerAddress, brokerId)
@@ -188,8 +191,8 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
188191
}
189192

190193
advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
191-
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress, Listener: l}
192-
p.brokerIdToListenerConfig[brokerId] = p.brokerToListenerConfig[brokerAddress]
194+
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
195+
p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l}
193196

194197
logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
195198

0 commit comments

Comments
 (0)