55 "crypto/tls"
66 "crypto/x509"
77 "encoding/json"
8- "fmt"
98 "io/ioutil"
109 "sync"
1110 "text/template"
@@ -18,12 +17,30 @@ import (
1817 log "github.com/sirupsen/logrus"
1918)
2019
20+ // BackendConfig holds the MQTT pub-sub backend configuration.
21+ type BackendConfig struct {
22+ Server string
23+ Username string
24+ Password string
25+ QOS uint8 `mapstructure:"qos"`
26+ CleanSession bool `mapstructure:"clean_session"`
27+ ClientID string `mapstructure:"client_id"`
28+ CACert string `mapstructure:"ca_cert"`
29+ TLSCert string `mapstructure:"tls_cert"`
30+ TLSKey string `mapstructure:"tls_key"`
31+ UplinkTopicTemplate string `mapstructure:"uplink_topic_template"`
32+ DownlinkTopicTemplate string `mapstructure:"downlink_topic_template"`
33+ StatsTopicTemplate string `mapstructure:"stats_topic_template"`
34+ AckTopicTemplate string `mapstructure:"ack_topic_template"`
35+ }
36+
2137// Backend implements a MQTT pub-sub backend.
2238type Backend struct {
2339 conn mqtt.Client
2440 txPacketChan chan gw.TXPacketBytes
2541 gateways map [lorawan.EUI64 ]struct {}
2642 mutex sync.RWMutex
43+ config BackendConfig
2744
2845 UplinkTemplate * template.Template
2946 DownlinkTemplate * template.Template
@@ -32,54 +49,57 @@ type Backend struct {
3249}
3350
3451// NewBackend creates a new Backend.
35- func NewBackend (server , username , password , cafile , certFile , certKeyFile , uplinkTopic , downlinkTopic , statsTopic , ackTopic string ) (* Backend , error ) {
52+ func NewBackend (c BackendConfig ) (* Backend , error ) {
3653 var err error
3754
3855 b := Backend {
3956 txPacketChan : make (chan gw.TXPacketBytes ),
4057 gateways : make (map [lorawan.EUI64 ]struct {}),
58+ config : c ,
4159 }
4260
43- b .UplinkTemplate , err = template .New ("uplink" ).Parse (uplinkTopic )
61+ b .UplinkTemplate , err = template .New ("uplink" ).Parse (b . config . UplinkTopicTemplate )
4462 if err != nil {
4563 return nil , errors .Wrap (err , "parse uplink template error" )
4664 }
4765
48- b .DownlinkTemplate , err = template .New ("downlink" ).Parse (downlinkTopic )
66+ b .DownlinkTemplate , err = template .New ("downlink" ).Parse (b . config . DownlinkTopicTemplate )
4967 if err != nil {
5068 return nil , errors .Wrap (err , "parse downlink template error" )
5169 }
5270
53- b .StatsTemplate , err = template .New ("stats" ).Parse (statsTopic )
71+ b .StatsTemplate , err = template .New ("stats" ).Parse (b . config . StatsTopicTemplate )
5472 if err != nil {
5573 return nil , errors .Wrap (err , "parse stats template error" )
5674 }
5775
58- b .AckTemplate , err = template .New ("ack" ).Parse (ackTopic )
76+ b .AckTemplate , err = template .New ("ack" ).Parse (b . config . AckTopicTemplate )
5977 if err != nil {
6078 return nil , errors .Wrap (err , "parse ack template error" )
6179 }
6280
6381 opts := mqtt .NewClientOptions ()
64- opts .AddBroker (server )
65- opts .SetUsername (username )
66- opts .SetPassword (password )
82+ opts .AddBroker (b .config .Server )
83+ opts .SetUsername (b .config .Username )
84+ opts .SetPassword (b .config .Password )
85+ opts .SetCleanSession (b .config .CleanSession )
86+ opts .SetClientID (b .config .ClientID )
6787 opts .SetOnConnectHandler (b .onConnected )
6888 opts .SetConnectionLostHandler (b .onConnectionLost )
6989
70- tlsconfig , err := NewTLSConfig (cafile , certFile , certKeyFile )
90+ tlsconfig , err := NewTLSConfig (b . config . CACert , b . config . TLSCert , b . config . TLSKey )
7191 if err != nil {
7292 log .WithError (err ).WithFields (log.Fields {
73- "ca_cert" : cafile ,
74- "tls_cert" : certFile ,
75- "tls_key" : certKeyFile ,
93+ "ca_cert" : b . config . CACert ,
94+ "tls_cert" : b . config . TLSCert ,
95+ "tls_key" : b . config . TLSKey ,
7696 }).Fatal ("error loading mqtt certificate files" )
7797 }
7898 if tlsconfig != nil {
7999 opts .SetTLSConfig (tlsconfig )
80100 }
81101
82- log .WithField ("server" , server ).Info ("backend: connecting to mqtt broker" )
102+ log .WithField ("server" , b . config . Server ).Info ("backend: connecting to mqtt broker" )
83103 b .conn = mqtt .NewClient (opts )
84104 if token := b .conn .Connect (); token .Wait () && token .Error () != nil {
85105 return nil , token .Error ()
@@ -146,11 +166,14 @@ func (b *Backend) SubscribeGatewayTX(mac lorawan.EUI64) error {
146166
147167 topic := bytes .NewBuffer (nil )
148168 if err := b .DownlinkTemplate .Execute (topic , struct { MAC lorawan.EUI64 }{mac }); err != nil {
149- return errors .Wrap (err , "execute uplink template error" )
169+ return errors .Wrap (err , "execute downlink template error" )
150170 }
151171
152- log .WithField ("topic" , topic .String ()).Info ("backend: subscribing to topic" )
153- if token := b .conn .Subscribe (topic .String (), 0 , b .txPacketHandler ); token .Wait () && token .Error () != nil {
172+ log .WithFields (log.Fields {
173+ "topic" : topic .String (),
174+ "qos" : b .config .QOS ,
175+ }).Info ("backend: subscribing to topic" )
176+ if token := b .conn .Subscribe (topic .String (), b .config .QOS , b .txPacketHandler ); token .Wait () && token .Error () != nil {
154177 return token .Error ()
155178 }
156179 b .gateways [mac ] = struct {}{}
@@ -165,7 +188,7 @@ func (b *Backend) UnSubscribeGatewayTX(mac lorawan.EUI64) error {
165188
166189 topic := bytes .NewBuffer (nil )
167190 if err := b .DownlinkTemplate .Execute (topic , struct { MAC lorawan.EUI64 }{mac }); err != nil {
168- return errors .Wrap (err , "execute uplink template error" )
191+ return errors .Wrap (err , "execute downlink template error" )
169192 }
170193
171194 log .WithField ("topic" , topic .String ()).Info ("backend: unsubscribing from topic" )
@@ -201,8 +224,11 @@ func (b *Backend) publish(mac lorawan.EUI64, topicTemplate *template.Template, v
201224 if err != nil {
202225 return err
203226 }
204- log .WithField ("topic" , topic .String ()).Info ("backend: publishing packet" )
205- if token := b .conn .Publish (topic .String (), 0 , false , bytes ); token .Wait () && token .Error () != nil {
227+ log .WithFields (log.Fields {
228+ "topic" : topic .String (),
229+ "qos" : b .config .QOS ,
230+ }).Info ("backend: publishing packet" )
231+ if token := b .conn .Publish (topic .String (), b .config .QOS , false , bytes ); token .Wait () && token .Error () != nil {
206232 return token .Error ()
207233 }
208234 return nil
@@ -228,7 +254,13 @@ func (b *Backend) onConnected(c mqtt.Client) {
228254 log .WithField ("topic_count" , len (b .gateways )).Info ("backend: re-registering to gateway topics" )
229255 topics := make (map [string ]byte )
230256 for k := range b .gateways {
231- topics [fmt .Sprintf ("gateway/%s/tx" , k )] = 0
257+ topic := bytes .NewBuffer (nil )
258+ if err := b .DownlinkTemplate .Execute (topic , struct { MAC lorawan.EUI64 }{k }); err != nil {
259+ log .WithError (err ).Error ("backend: execute downlink template error" )
260+ time .Sleep (time .Second )
261+ continue
262+ }
263+ topics [topic .String ()] = b .config .QOS
232264 }
233265 if token := b .conn .SubscribeMultiple (topics , b .txPacketHandler ); token .Wait () && token .Error () != nil {
234266 log .WithField ("topic_count" , len (topics )).Errorf ("backend: subscribe multiple failed: %s" , token .Error ())
0 commit comments