@@ -3,6 +3,8 @@ package mqttpubsub
33import (
44 "encoding/json"
55 "fmt"
6+ "sync"
7+ "time"
68
79 log "github.com/Sirupsen/logrus"
810 "github.com/brocaar/loraserver/models"
@@ -14,19 +16,23 @@ import (
1416type Backend struct {
1517 conn mqtt.Client
1618 txPacketChan chan models.TXPacket
19+ gateways map [lorawan.EUI64 ]struct {}
20+ mutex sync.RWMutex
1721}
1822
1923// NewBackend creates a new Backend.
2024func NewBackend (server , username , password string ) (* Backend , error ) {
2125 b := Backend {
2226 txPacketChan : make (chan models.TXPacket ),
27+ gateways : make (map [lorawan.EUI64 ]struct {}),
2328 }
2429
2530 opts := mqtt .NewClientOptions ()
2631 opts .AddBroker (server )
2732 opts .SetUsername (username )
2833 opts .SetPassword (password )
29- opts .SetClientID ("lora-semtech-bridge" )
34+ opts .SetOnConnectHandler (b .onConnected )
35+ opts .SetConnectionLostHandler (b .onConnectionLost )
3036
3137 log .WithField ("server" , server ).Info ("backend/mqttpubsub: connecting to MQTT server" )
3238 b .conn = mqtt .NewClient (opts )
@@ -50,22 +56,30 @@ func (b *Backend) TXPacketChan() chan models.TXPacket {
5056// SubscribeGatewayTX subscribes the backend to the gateway TXPacket
5157// topic (packets the gateway needs to transmit).
5258func (b * Backend ) SubscribeGatewayTX (mac lorawan.EUI64 ) error {
59+ defer b .mutex .Unlock ()
60+ b .mutex .Lock ()
61+
5362 topic := fmt .Sprintf ("gateway/%s/tx" , mac .String ())
5463 log .WithField ("topic" , topic ).Info ("backend/mqttpubsub: subscribing to topic" )
5564 if token := b .conn .Subscribe (topic , 0 , b .txPacketHandler ); token .Wait () && token .Error () != nil {
5665 return token .Error ()
5766 }
67+ b .gateways [mac ] = struct {}{}
5868 return nil
5969}
6070
6171// UnSubscribeGatewayTX unsubscribes the backend from the gateway TXPacket
6272// topic.
6373func (b * Backend ) UnSubscribeGatewayTX (mac lorawan.EUI64 ) error {
74+ defer b .mutex .Unlock ()
75+ b .mutex .Lock ()
76+
6477 topic := fmt .Sprintf ("gateway/%s/tx" , mac .String ())
6578 log .WithField ("topic" , topic ).Info ("backend/mqttpubsub: unsubscribing from topic" )
6679 if token := b .conn .Unsubscribe (topic ); token .Wait () && token .Error () != nil {
6780 return token .Error ()
6881 }
82+ delete (b .gateways , mac )
6983 return nil
7084}
7185
@@ -102,3 +116,29 @@ func (b *Backend) txPacketHandler(c mqtt.Client, msg mqtt.Message) {
102116 }
103117 b .txPacketChan <- txPacket
104118}
119+
120+ func (b * Backend ) onConnected (c mqtt.Client ) {
121+ defer b .mutex .RUnlock ()
122+ b .mutex .RLock ()
123+
124+ log .Info ("backend/mqttpubsub: connected to mqtt server" )
125+ if len (b .gateways ) > 0 {
126+ for {
127+ log .WithField ("topic_count" , len (b .gateways )).Info ("Backend/mqttpubsub: re-registering to gateway topics" )
128+ topics := make (map [string ]byte )
129+ for k := range b .gateways {
130+ topics [fmt .Sprintf ("gateway/%s/tx" , k )] = 0
131+ }
132+ if token := b .conn .SubscribeMultiple (topics , b .txPacketHandler ); token .Wait () && token .Error () != nil {
133+ log .WithField ("topic_count" , len (topics )).Errorf ("backend/mqttpubsub: subscribe multiple failed: %s" , token .Error ())
134+ time .Sleep (time .Second )
135+ continue
136+ }
137+ return
138+ }
139+ }
140+ }
141+
142+ func (b * Backend ) onConnectionLost (c mqtt.Client , reason error ) {
143+ log .Errorf ("backend/mqttpubsub: mqtt connection error: %s" , reason )
144+ }
0 commit comments