Skip to content

Commit 81baadf

Browse files
committed
Refactor backend <> integration communication.
1 parent 1838fcd commit 81baadf

File tree

15 files changed

+565
-388
lines changed

15 files changed

+565
-388
lines changed

cmd/chirpstack-gateway-bridge/cmd/root_run.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ func run(cmd *cobra.Command, args []string) error {
3232
setupMetrics,
3333
setupMetaData,
3434
setupCommands,
35+
startIntegration,
36+
startBackend,
3537
}
3638

3739
for _, t := range tasks {
@@ -109,3 +111,17 @@ func setupCommands() error {
109111
}
110112
return nil
111113
}
114+
115+
func startIntegration() error {
116+
if err := integration.GetIntegration().Start(); err != nil {
117+
return errors.Wrap(err, "start integration error")
118+
}
119+
return nil
120+
}
121+
122+
func startBackend() error {
123+
if err := backend.GetBackend().Start(); err != nil {
124+
return errors.Wrap(err, "start backend error")
125+
}
126+
return nil
127+
}

internal/backend/backend.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,26 @@ func GetBackend() Backend {
4444

4545
// Backend defines the interface that a backend must implement
4646
type Backend interface {
47-
// Close closes the backend.
48-
Close() error
47+
// Stop closes the backend.
48+
Stop() error
4949

50-
// GetDownlinkTXAckChan returns the channel for downlink tx acknowledgements.
51-
GetDownlinkTXAckChan() chan gw.DownlinkTXAck
50+
// Start starts the backend.
51+
Start() error
5252

53-
// GetGatewayStatsChan returns the channel for gateway statistics.
54-
GetGatewayStatsChan() chan gw.GatewayStats
53+
// SetDownlinkTxAckFunc sets the DownlinkTXAck handler func.
54+
SetDownlinkTxAckFunc(func(gw.DownlinkTXAck))
5555

56-
// GetUplinkFrameChan returns the channel for received uplinks.
57-
GetUplinkFrameChan() chan gw.UplinkFrame
56+
// SetGatewayStatsFunc sets the GatewayStats handler func.
57+
SetGatewayStatsFunc(func(gw.GatewayStats))
5858

59-
// GetRawPacketForwarderEventChan returns the raw packet-forwarder command channel.
60-
GetRawPacketForwarderEventChan() chan gw.RawPacketForwarderEvent
59+
// SetUplinkFrameFunc sets the UplinkFrame handler func.
60+
SetUplinkFrameFunc(func(gw.UplinkFrame))
6161

62-
// GetSubscribeEventChan returns the channel for the (un)subscribe events.
63-
GetSubscribeEventChan() chan events.Subscribe
62+
// SetRawPacketForwarderEventFunc sets the RawPacketForwarderEvent handler func.
63+
SetRawPacketForwarderEventFunc(func(gw.RawPacketForwarderEvent))
64+
65+
// SetSubscribeEventFunc sets the Subscribe handler func.
66+
SetSubscribeEventFunc(func(events.Subscribe))
6467

6568
// SendDownlinkFrame sends the given downlink frame.
6669
SendDownlinkFrame(gw.DownlinkFrame) error

internal/backend/basicstation/backend.go

Lines changed: 86 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ var upgrader = websocket.Upgrader{
4242
type Backend struct {
4343
sync.RWMutex
4444

45+
caCert string
46+
tlsCert string
47+
tlsKey string
48+
49+
server *http.Server
4550
ln net.Listener
4651
scheme string
4752
isClosed bool
@@ -53,10 +58,10 @@ type Backend struct {
5358

5459
gateways gateways
5560

56-
downlinkTXAckChan chan gw.DownlinkTXAck
57-
uplinkFrameChan chan gw.UplinkFrame
58-
gatewayStatsChan chan gw.GatewayStats
59-
rawPacketForwarderEventChan chan gw.RawPacketForwarderEvent
61+
downlinkTxAckFunc func(gw.DownlinkTXAck)
62+
uplinkFrameFunc func(gw.UplinkFrame)
63+
gatewayStatsFunc func(gw.GatewayStats)
64+
rawPacketForwarderEventFunc func(gw.RawPacketForwarderEvent)
6065

6166
band band.Band
6267
region band.Name
@@ -79,14 +84,12 @@ func NewBackend(conf config.Config) (*Backend, error) {
7984
scheme: "ws",
8085

8186
gateways: gateways{
82-
gateways: make(map[lorawan.EUI64]gateway),
83-
subscribeEventChan: make(chan events.Subscribe),
87+
gateways: make(map[lorawan.EUI64]gateway),
8488
},
8589

86-
downlinkTXAckChan: make(chan gw.DownlinkTXAck),
87-
uplinkFrameChan: make(chan gw.UplinkFrame),
88-
gatewayStatsChan: make(chan gw.GatewayStats),
89-
rawPacketForwarderEventChan: make(chan gw.RawPacketForwarderEvent),
90+
caCert: conf.Backend.BasicStation.CACert,
91+
tlsCert: conf.Backend.BasicStation.TLSCert,
92+
tlsKey: conf.Backend.BasicStation.TLSKey,
9093

9194
statsInterval: conf.Backend.BasicStation.StatsInterval,
9295
pingInterval: conf.Backend.BasicStation.PingInterval,
@@ -150,74 +153,52 @@ func NewBackend(conf config.Config) (*Backend, error) {
150153
}
151154

152155
// init HTTP server
153-
server := &http.Server{
156+
b.server = &http.Server{
154157
Handler: mux,
155158
}
156159

157160
// if the CA cert is configured, setup client certificate verification.
158-
if conf.Backend.BasicStation.CACert != "" {
159-
rawCACert, err := ioutil.ReadFile(conf.Backend.BasicStation.CACert)
161+
if b.caCert != "" {
162+
rawCACert, err := ioutil.ReadFile(b.caCert)
160163
if err != nil {
161164
return nil, errors.Wrap(err, "read ca cert error")
162165
}
163166

164167
caCertPool := x509.NewCertPool()
165168
caCertPool.AppendCertsFromPEM(rawCACert)
166169

167-
server.TLSConfig = &tls.Config{
170+
b.server.TLSConfig = &tls.Config{
168171
ClientCAs: caCertPool,
169172
ClientAuth: tls.RequireAndVerifyClientCert,
170173
}
171174
}
172175

173-
go func() {
174-
log.WithFields(log.Fields{
175-
"bind": b.ln.Addr(),
176-
"tls_cert": conf.Backend.BasicStation.TLSCert,
177-
"tls_key": conf.Backend.BasicStation.TLSKey,
178-
"ca_cert": conf.Backend.BasicStation.CACert,
179-
}).Info("backend/basicstation: starting websocket listener")
180-
181-
if conf.Backend.BasicStation.TLSCert == "" && conf.Backend.BasicStation.TLSKey == "" && conf.Backend.BasicStation.CACert == "" {
182-
// no tls
183-
if err := server.Serve(b.ln); err != nil && !b.isClosed {
184-
log.WithError(err).Fatal("backend/basicstation: server error")
185-
}
186-
} else {
187-
// tls
188-
b.scheme = "wss"
189-
if err := server.ServeTLS(b.ln, conf.Backend.BasicStation.TLSCert, conf.Backend.BasicStation.TLSKey); err != nil && !b.isClosed {
190-
log.WithError(err).Fatal("backend/basicstation: server error")
191-
}
192-
}
193-
}()
194-
195176
return &b, nil
196177
}
197178

198-
// GetDownlinkTXAckChan returns the channel for downlink tx acknowledgements.
199-
func (b *Backend) GetDownlinkTXAckChan() chan gw.DownlinkTXAck {
200-
return b.downlinkTXAckChan
179+
// SetDownlinkTxAckFunc sets the DownlinkTXAck handler func.
180+
func (b *Backend) SetDownlinkTxAckFunc(f func(gw.DownlinkTXAck)) {
181+
b.downlinkTxAckFunc = f
201182
}
202183

203-
// GetGatewayStatsChan returns the channel for gateway statistics.
204-
func (b *Backend) GetGatewayStatsChan() chan gw.GatewayStats {
205-
return b.gatewayStatsChan
184+
// SetGatewayStatsFunc sets the GatewayStats handler func.
185+
func (b *Backend) SetGatewayStatsFunc(f func(gw.GatewayStats)) {
186+
b.gatewayStatsFunc = f
206187
}
207188

208-
// GetUplinkFrameChan returns the channel for received uplinks.
209-
func (b *Backend) GetUplinkFrameChan() chan gw.UplinkFrame {
210-
return b.uplinkFrameChan
189+
// SetUplinkFrameFunc sets the UplinkFrame handler func.
190+
func (b *Backend) SetUplinkFrameFunc(f func(gw.UplinkFrame)) {
191+
b.uplinkFrameFunc = f
211192
}
212193

213-
// GetSubscribeEventChan returns the channel for the (un)subscribe events.
214-
func (b *Backend) GetSubscribeEventChan() chan events.Subscribe {
215-
return b.gateways.subscribeEventChan
194+
// SetRawPacketForwarderEventFunc sets the RawPacketForwarderEvent handler func.
195+
func (b *Backend) SetRawPacketForwarderEventFunc(f func(gw.RawPacketForwarderEvent)) {
196+
b.rawPacketForwarderEventFunc = f
216197
}
217198

218-
// GetRawPacketForwarderEventChan returns the raw packet-forwarder command channel.
219-
func (b *Backend) GetRawPacketForwarderEventChan() chan gw.RawPacketForwarderEvent {
220-
return b.rawPacketForwarderEventChan
199+
// SetSubscribeEventFunc sets the Subscribe handler func.
200+
func (b *Backend) SetSubscribeEventFunc(f func(events.Subscribe)) {
201+
b.gateways.subscribeEventFunc = f
221202
}
222203

223204
// SendDownlinkFrame sends the given downlink frame.
@@ -298,8 +279,35 @@ func (b *Backend) RawPacketForwarderCommand(pl gw.RawPacketForwarderCommand) err
298279
return nil
299280
}
300281

301-
// Close closes the backend.
302-
func (b *Backend) Close() error {
282+
// Start starts the backend.
283+
func (b *Backend) Start() error {
284+
go func() {
285+
log.WithFields(log.Fields{
286+
"bind": b.ln.Addr(),
287+
"ca_cert": b.caCert,
288+
"tls_cert": b.tlsCert,
289+
"tls_key": b.tlsKey,
290+
}).Info("backend/basicstation: starting websocket listener")
291+
292+
if b.tlsCert == "" && b.tlsKey == "" && b.caCert == "" {
293+
// no tls
294+
if err := b.server.Serve(b.ln); err != nil && !b.isClosed {
295+
log.WithError(err).Fatal("backend/basicstation: server error")
296+
}
297+
} else {
298+
// tls
299+
b.scheme = "wss"
300+
if err := b.server.ServeTLS(b.ln, b.tlsCert, b.tlsKey); err != nil && !b.isClosed {
301+
log.WithError(err).Fatal("backend/basicstation: server error")
302+
}
303+
}
304+
}()
305+
306+
return nil
307+
}
308+
309+
// Stop stops the backend.
310+
func (b *Backend) Stop() error {
303311
b.isClosed = true
304312
return b.ln.Close()
305313
}
@@ -438,14 +446,16 @@ func (b *Backend) handleGateway(r *http.Request, c *websocket.Conn) {
438446
b.statsCache.Delete(gwIDStr + ":tx")
439447
b.statsCache.Delete(gwIDStr + ":txOK")
440448

441-
b.gatewayStatsChan <- gw.GatewayStats{
442-
GatewayId: gatewayID[:],
443-
Time: ptypes.TimestampNow(),
444-
StatsId: id[:],
445-
RxPacketsReceived: rx,
446-
RxPacketsReceivedOk: rxOK,
447-
TxPacketsReceived: tx,
448-
TxPacketsEmitted: txOK,
449+
if b.gatewayStatsFunc != nil {
450+
b.gatewayStatsFunc(gw.GatewayStats{
451+
GatewayId: gatewayID[:],
452+
Time: ptypes.TimestampNow(),
453+
StatsId: id[:],
454+
RxPacketsReceived: rx,
455+
RxPacketsReceivedOk: rxOK,
456+
TxPacketsReceived: tx,
457+
TxPacketsEmitted: txOK,
458+
})
449459
}
450460
case <-done:
451461
return
@@ -622,7 +632,9 @@ func (b *Backend) handleJoinRequest(gatewayID lorawan.EUI64, v structs.JoinReque
622632
"uplink_id": uplinkID,
623633
}).Info("backend/basicstation: join-request received")
624634

625-
b.uplinkFrameChan <- uplinkFrame
635+
if b.uplinkFrameFunc != nil {
636+
b.uplinkFrameFunc(uplinkFrame)
637+
}
626638
}
627639

628640
func (b *Backend) handleProprietaryDataFrame(gatewayID lorawan.EUI64, v structs.UplinkProprietaryFrame) {
@@ -649,7 +661,9 @@ func (b *Backend) handleProprietaryDataFrame(gatewayID lorawan.EUI64, v structs.
649661
"uplink_id": uplinkID,
650662
}).Info("backend/basicstation: proprietary uplink frame received")
651663

652-
b.uplinkFrameChan <- uplinkFrame
664+
if b.uplinkFrameFunc != nil {
665+
b.uplinkFrameFunc(uplinkFrame)
666+
}
653667
}
654668

655669
func (b *Backend) handleDownlinkTransmittedMessage(gatewayID lorawan.EUI64, v structs.DownlinkTransmitted) {
@@ -676,7 +690,9 @@ func (b *Backend) handleDownlinkTransmittedMessage(gatewayID lorawan.EUI64, v st
676690
"downlink_id": downID,
677691
}).Info("backend/basicstation: downlink transmitted message received")
678692

679-
b.downlinkTXAckChan <- txack
693+
if b.downlinkTxAckFunc != nil {
694+
b.downlinkTxAckFunc(txack)
695+
}
680696
}
681697

682698
func (b *Backend) handleUplinkDataFrame(gatewayID lorawan.EUI64, v structs.UplinkDataFrame) {
@@ -703,7 +719,9 @@ func (b *Backend) handleUplinkDataFrame(gatewayID lorawan.EUI64, v structs.Uplin
703719
"uplink_id": uplinkID,
704720
}).Info("backend/basicstation: uplink frame received")
705721

706-
b.uplinkFrameChan <- uplinkFrame
722+
if b.uplinkFrameFunc != nil {
723+
b.uplinkFrameFunc(uplinkFrame)
724+
}
707725
}
708726

709727
func (b *Backend) handleRawPacketForwarderEvent(gatewayID lorawan.EUI64, pl []byte) {
@@ -726,7 +744,9 @@ func (b *Backend) handleRawPacketForwarderEvent(gatewayID lorawan.EUI64, pl []by
726744
"raw_id": rawID,
727745
}).Info("backend/basicstation: raw packet-forwarder event received")
728746

729-
b.rawPacketForwarderEventChan <- rawEvent
747+
if b.rawPacketForwarderEventFunc != nil {
748+
b.rawPacketForwarderEventFunc(rawEvent)
749+
}
730750
}
731751

732752
func (b *Backend) handleTimeSync(gatewayID lorawan.EUI64, v structs.TimeSyncRequest) {

0 commit comments

Comments
 (0)