Skip to content

Commit 91e8234

Browse files
committed
Refactor MQTT (un)subscribe.
This separates the SetGatewaySubscription from the actual MQTT (un)subscribe. When SetGatewaySubscription is called multiple times for the same gateway while the MQTT client is disconnected, `gateways` will always have the desired state of gateway for which a subscription must exist. A separate handles handles the MQTT (un)subscribe and stores this state into `gatewaysSubscribed`. Previously when disconnected, the SetGatewaySubscription in case the MQTT client was disconnected. When SetGatewaySubscription was called multiple times for the same gateway to unsubscribe and subscribe, there was a risk that it was not executed in the correct order, leaving the gateway disconnected. Closes #183.
1 parent 81baadf commit 91e8234

File tree

1 file changed

+88
-52
lines changed

1 file changed

+88
-52
lines changed

internal/integration/mqtt/backend.go

Lines changed: 88 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,22 @@ import (
2323

2424
// Backend implements a MQTT backend.
2525
type Backend struct {
26-
sync.RWMutex
26+
auth auth.Authentication
2727

28-
auth auth.Authentication
2928
conn paho.Client
30-
closed bool
29+
connMux sync.RWMutex
30+
connClosed bool
3131
clientOpts *paho.ClientOptions
3232

3333
downlinkFrameFunc func(gw.DownlinkFrame)
3434
gatewayConfigurationFunc func(gw.GatewayConfiguration)
3535
gatewayCommandExecRequestFunc func(gw.GatewayCommandExecRequest)
3636
rawPacketForwarderCommandFunc func(gw.RawPacketForwarderCommand)
3737

38+
gatewaysMux sync.RWMutex
3839
gateways map[lorawan.EUI64]struct{}
40+
gatewaysSubscribedMux sync.Mutex
41+
gatewaysSubscribed map[lorawan.EUI64]struct{}
3942
terminateOnConnectError bool
4043

4144
qos uint8
@@ -55,6 +58,7 @@ func NewBackend(conf config.Config) (*Backend, error) {
5558
terminateOnConnectError: conf.Integration.MQTT.TerminateOnConnectError,
5659
clientOpts: paho.NewClientOptions(),
5760
gateways: make(map[lorawan.EUI64]struct{}),
61+
gatewaysSubscribed: make(map[lorawan.EUI64]struct{}),
5862
}
5963

6064
switch conf.Integration.MQTT.Auth.Type {
@@ -140,16 +144,17 @@ func NewBackend(conf config.Config) (*Backend, error) {
140144
func (b *Backend) Start() error {
141145
b.connectLoop()
142146
go b.reconnectLoop()
147+
go b.subscribeLoop()
143148
return nil
144149
}
145150

146151
// Stop stops the integration.
147152
func (b *Backend) Stop() error {
148-
b.Lock()
149-
b.closed = true
150-
b.Unlock()
153+
b.connMux.Lock()
154+
defer b.connMux.Unlock()
151155

152156
b.conn.Disconnect(250)
157+
b.connClosed = true
153158
return nil
154159
}
155160

@@ -173,45 +178,23 @@ func (b *Backend) SetRawPacketForwarderCommandFunc(f func(gw.RawPacketForwarderC
173178
b.rawPacketForwarderCommandFunc = f
174179
}
175180

176-
// SetGatewaySubscription (un)subscribes the given gateway.
181+
// SetGatewaySubscription sets or unsets the gateway.
182+
// Note: the actual MQTT (un)subscribe happens in a separate function to avoid
183+
// race conditions in case of connection issues. This way, the gateways map
184+
// always reflect the desired state.
177185
func (b *Backend) SetGatewaySubscription(subscribe bool, gatewayID lorawan.EUI64) error {
178-
b.Lock()
179-
defer b.Unlock()
180-
181186
log.WithFields(log.Fields{
182187
"gateway_id": gatewayID,
183188
"subscribe": subscribe,
184-
}).Debug("integration/mqtt: set gateway subscription called")
185-
186-
_, ok := b.gateways[gatewayID]
187-
if ok == subscribe {
188-
return nil
189-
}
190-
191-
for {
192-
if subscribe {
193-
if err := b.subscribeGateway(gatewayID); err != nil {
194-
log.WithError(err).WithFields(log.Fields{
195-
"gateway_id": gatewayID,
196-
}).Error("integration/mqtt: subscribe gateway error")
197-
time.Sleep(time.Second)
198-
continue
199-
}
200-
201-
b.gateways[gatewayID] = struct{}{}
202-
} else {
203-
if err := b.unsubscribeGateway(gatewayID); err != nil {
204-
log.WithError(err).WithFields(log.Fields{
205-
"gateway_id": gatewayID,
206-
}).Error("integration/mqtt: unsubscribe gateway error")
207-
time.Sleep(time.Second)
208-
continue
209-
}
189+
}).Debug("integration/mqtt: set gateway subscription")
210190

211-
delete(b.gateways, gatewayID)
212-
}
191+
b.gatewaysMux.Lock()
192+
defer b.gatewaysMux.Unlock()
213193

214-
break
194+
if subscribe {
195+
b.gateways[gatewayID] = struct{}{}
196+
} else {
197+
delete(b.gateways, gatewayID)
215198
}
216199

217200
return nil
@@ -265,8 +248,8 @@ func (b *Backend) PublishEvent(gatewayID lorawan.EUI64, event string, id uuid.UU
265248
}
266249

267250
func (b *Backend) connect() error {
268-
b.Lock()
269-
defer b.Unlock()
251+
b.connMux.Lock()
252+
defer b.connMux.Unlock()
270253

271254
if err := b.auth.Update(b.clientOpts); err != nil {
272255
return errors.Wrap(err, "integration/mqtt: update authentication error")
@@ -300,8 +283,8 @@ func (b *Backend) connectLoop() {
300283
func (b *Backend) disconnect() error {
301284
mqttDisconnectCounter().Inc()
302285

303-
b.Lock()
304-
defer b.Unlock()
286+
b.connMux.Lock()
287+
defer b.connMux.Unlock()
305288

306289
b.conn.Disconnect(250)
307290
return nil
@@ -310,7 +293,11 @@ func (b *Backend) disconnect() error {
310293
func (b *Backend) reconnectLoop() {
311294
if b.auth.ReconnectAfter() > 0 {
312295
for {
313-
if b.closed {
296+
b.connMux.RLock()
297+
closed := b.connClosed
298+
b.connMux.RUnlock()
299+
300+
if closed {
314301
break
315302
}
316303
time.Sleep(b.auth.ReconnectAfter())
@@ -326,22 +313,71 @@ func (b *Backend) reconnectLoop() {
326313

327314
func (b *Backend) onConnected(c paho.Client) {
328315
mqttConnectCounter().Inc()
316+
log.Info("integration/mqtt: connected to mqtt broker")
329317

330-
b.RLock()
331-
defer b.RUnlock()
318+
b.gatewaysSubscribedMux.Lock()
319+
defer b.gatewaysSubscribedMux.Unlock()
332320

333-
log.Info("integration/mqtt: connected to mqtt broker")
321+
// reset the subscriptions as we have a new connection
322+
// note: this is done in the onConnected function because the subscribeLoop
323+
// locks the gatewaysSubscribedMux and will only release it after all
324+
// (un)subscribe operations have been completed. If it would be done in the
325+
// onConnectionLost function, the function could block until the connection
326+
// is restored because the (un)subscribe operations will block until then.
327+
b.gatewaysSubscribed = make(map[lorawan.EUI64]struct{})
328+
}
334329

335-
for gatewayID := range b.gateways {
336-
for {
330+
func (b *Backend) subscribeLoop() {
331+
for {
332+
b.connMux.RLock()
333+
closed := b.connClosed
334+
b.connMux.RUnlock()
335+
if closed {
336+
break
337+
}
338+
339+
var subscribe []lorawan.EUI64
340+
var unsubscribe []lorawan.EUI64
341+
342+
b.gatewaysMux.RLock()
343+
b.gatewaysSubscribedMux.Lock()
344+
345+
// subscribe
346+
for gatewayID := range b.gateways {
347+
if _, ok := b.gatewaysSubscribed[gatewayID]; !ok {
348+
subscribe = append(subscribe, gatewayID)
349+
}
350+
}
351+
352+
// unsubscribe
353+
for gatewayID := range b.gatewaysSubscribed {
354+
if _, ok := b.gateways[gatewayID]; !ok {
355+
unsubscribe = append(unsubscribe, gatewayID)
356+
}
357+
}
358+
359+
// unlock gatewaysMux so that SetGatewaySubscription can write again
360+
// to the map, in which case changes are picked up in the next run
361+
b.gatewaysMux.RUnlock()
362+
363+
for _, gatewayID := range subscribe {
337364
if err := b.subscribeGateway(gatewayID); err != nil {
338365
log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: subscribe gateway error")
339-
time.Sleep(time.Second)
340-
continue
366+
} else {
367+
b.gatewaysSubscribed[gatewayID] = struct{}{}
341368
}
369+
}
342370

343-
break
371+
for _, gatewayID := range unsubscribe {
372+
if err := b.unsubscribeGateway(gatewayID); err != nil {
373+
log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: unsubscribe gateway error")
374+
} else {
375+
delete(b.gatewaysSubscribed, gatewayID)
376+
}
344377
}
378+
379+
b.gatewaysSubscribedMux.Unlock()
380+
time.Sleep(time.Millisecond * 100)
345381
}
346382
}
347383

0 commit comments

Comments
 (0)