Skip to content

Commit e39633f

Browse files
committed
Implement additional gateway stats aggregations.
1 parent bbb0d54 commit e39633f

File tree

10 files changed

+689
-124
lines changed

10 files changed

+689
-124
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/brocaar/chirpstack-gateway-bridge
33
go 1.16
44

55
require (
6-
github.com/brocaar/chirpstack-api/go/v3 v3.10.1
6+
github.com/brocaar/chirpstack-api/go/v3 v3.11.0
77
github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303
88
github.com/dgrijalva/jwt-go v3.2.0+incompatible
99
github.com/eclipse/paho.mqtt.golang v1.3.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
5151
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
5252
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2 h1:oMCHnXa6CCCafdPDbMh/lWRhRByN0VFLvv+g+ayx1SI=
5353
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2/go.mod h1:PkYb9DJNAwrSvRx5DYA+gUcOIgTGVMNkfSCbZM8cWpI=
54-
github.com/brocaar/chirpstack-api/go/v3 v3.10.1 h1:ijf3AKDw++aSPJE41lkiPxXtshxZF++cXzV2l9e7EYk=
55-
github.com/brocaar/chirpstack-api/go/v3 v3.10.1/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00=
54+
github.com/brocaar/chirpstack-api/go/v3 v3.11.0 h1:RV4DVNRlDqilrv7ttqg4REdTxFyQ8F8ETmymUgpC/cI=
55+
github.com/brocaar/chirpstack-api/go/v3 v3.11.0/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00=
5656
github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303 h1:LkE19tFPfDaRh1HIKWLCZKSBZNonMu0rIOJPCLvEjC0=
5757
github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303/go.mod h1:CciUmQHIpUYTHHMeICtyamM7d+47VV+WBZ5ReDozpoc=
5858
github.com/caarlos0/ctrlc v1.0.0 h1:2DtF8GSIcajgffDFJzyG15vO+1PuBWOMUdFut7NnXhw=

internal/backend/basicstation/backend.go

Lines changed: 29 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/brocaar/chirpstack-api/go/v3/gw"
2626
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/basicstation/structs"
2727
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/events"
28+
"github.com/brocaar/chirpstack-gateway-bridge/internal/backend/stats"
2829
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
2930
"github.com/brocaar/lorawan"
3031
"github.com/brocaar/lorawan/band"
@@ -71,9 +72,6 @@ type Backend struct {
7172
frequencyMax uint32
7273
routerConfig structs.RouterConfig
7374

74-
// Cache to store stats.
75-
statsCache *cache.Cache
76-
7775
// Cache to store diid to UUIDs.
7876
diidCache *cache.Cache
7977
}
@@ -100,8 +98,7 @@ func NewBackend(conf config.Config) (*Backend, error) {
10098
frequencyMin: conf.Backend.BasicStation.FrequencyMin,
10199
frequencyMax: conf.Backend.BasicStation.FrequencyMax,
102100

103-
diidCache: cache.New(time.Minute, time.Minute),
104-
statsCache: cache.New(conf.Backend.BasicStation.StatsInterval*2, conf.Backend.BasicStation.StatsInterval*2),
101+
diidCache: cache.New(time.Minute, time.Minute),
105102
}
106103

107104
for _, n := range conf.Filters.NetIDs {
@@ -226,10 +223,8 @@ func (b *Backend) SendDownlinkFrame(df gw.DownlinkFrame) error {
226223
copy(gatewayID[:], df.GetGatewayId())
227224
copy(downID[:], df.GetDownlinkId())
228225

229-
b.incrementTxStats(gatewayID)
230-
231-
// store token to UUID mapping
232-
b.diidCache.SetDefault(fmt.Sprintf("%d", df.Token), df.GetDownlinkId())
226+
// Store downlink under DIID in cache
227+
b.diidCache.SetDefault(fmt.Sprintf("%d", pl.DIID), df)
233228

234229
websocketSendCounter("dnmsg").Inc()
235230
if err := b.sendToGateway(gatewayID, pl); err != nil {
@@ -419,8 +414,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
419414

420415
// stats publishing loop
421416
go func() {
422-
gwIDStr := gatewayID.String()
423-
424417
for {
425418
select {
426419
case <-statsTicker.C:
@@ -430,35 +423,13 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
430423
continue
431424
}
432425

433-
var rx, rxOK, tx, txOK uint32
434-
if v, ok := b.statsCache.Get(gwIDStr + ":rx"); ok {
435-
rx = v.(uint32)
436-
}
437-
if v, ok := b.statsCache.Get(gwIDStr + ":rxOK"); ok {
438-
rxOK = v.(uint32)
439-
}
440-
if v, ok := b.statsCache.Get(gwIDStr + ":tx"); ok {
441-
tx = v.(uint32)
442-
}
443-
if v, ok := b.statsCache.Get(gwIDStr + ":txOK"); ok {
444-
txOK = v.(uint32)
445-
}
446-
447-
b.statsCache.Delete(gwIDStr + ":rx")
448-
b.statsCache.Delete(gwIDStr + ":rxOK")
449-
b.statsCache.Delete(gwIDStr + ":tx")
450-
b.statsCache.Delete(gwIDStr + ":txOK")
426+
stats := conn.stats.ExportStats()
427+
stats.GatewayId = gatewayID[:]
428+
stats.Time = ptypes.TimestampNow()
429+
stats.StatsId = id[:]
451430

452431
if b.gatewayStatsFunc != nil {
453-
b.gatewayStatsFunc(gw.GatewayStats{
454-
GatewayId: gatewayID[:],
455-
Time: ptypes.TimestampNow(),
456-
StatsId: id[:],
457-
RxPacketsReceived: rx,
458-
RxPacketsReceivedOk: rxOK,
459-
TxPacketsReceived: tx,
460-
TxPacketsEmitted: txOK,
461-
})
432+
b.gatewayStatsFunc(stats)
462433
}
463434
case <-done:
464435
return
@@ -523,7 +494,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
523494
b.handleVersion(gatewayID, pl)
524495
case structs.UplinkDataFrameMessage:
525496
// handle uplink
526-
b.incrementRxStats(gatewayID)
527497
var pl structs.UplinkDataFrame
528498
if err := json.Unmarshal(msg, &pl); err != nil {
529499
log.WithError(err).WithFields(log.Fields{
@@ -536,7 +506,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
536506
b.handleUplinkDataFrame(gatewayID, pl)
537507
case structs.JoinRequestMessage:
538508
// handle join-request
539-
b.incrementRxStats(gatewayID)
540509
var pl structs.JoinRequest
541510
if err := json.Unmarshal(msg, &pl); err != nil {
542511
log.WithError(err).WithFields(log.Fields{
@@ -549,7 +518,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
549518
b.handleJoinRequest(gatewayID, pl)
550519
case structs.ProprietaryDataFrameMessage:
551520
// handle proprietary uplink
552-
b.incrementRxStats(gatewayID)
553521
var pl structs.UplinkProprietaryFrame
554522
if err := json.Unmarshal(msg, &pl); err != nil {
555523
log.WithError(err).WithFields(log.Fields{
@@ -562,7 +530,6 @@ func (b *Backend) handleGateway(r *http.Request, conn *connection) {
562530
b.handleProprietaryDataFrame(gatewayID, pl)
563531
case structs.DownlinkTransmittedMessage:
564532
// handle downlink transmitted
565-
b.incrementTxOkStats(gatewayID)
566533
var pl structs.DownlinkTransmitted
567534
if err := json.Unmarshal(msg, &pl); err != nil {
568535
log.WithError(err).WithFields(log.Fields{
@@ -630,6 +597,10 @@ func (b *Backend) handleJoinRequest(gatewayID lorawan.EUI64, v structs.JoinReque
630597
}
631598
uplinkFrame.RxInfo.UplinkId = uplinkID[:]
632599

600+
if conn, err := b.gateways.get(gatewayID); err == nil {
601+
conn.stats.CountUplink(&uplinkFrame)
602+
}
603+
633604
log.WithFields(log.Fields{
634605
"gateway_id": gatewayID,
635606
"uplink_id": uplinkID,
@@ -659,6 +630,10 @@ func (b *Backend) handleProprietaryDataFrame(gatewayID lorawan.EUI64, v structs.
659630
}
660631
uplinkFrame.RxInfo.UplinkId = uplinkID[:]
661632

633+
if conn, err := b.gateways.get(gatewayID); err == nil {
634+
conn.stats.CountUplink(&uplinkFrame)
635+
}
636+
662637
log.WithFields(log.Fields{
663638
"gateway_id": gatewayID,
664639
"uplink_id": uplinkID,
@@ -682,7 +657,12 @@ func (b *Backend) handleDownlinkTransmittedMessage(gatewayID lorawan.EUI64, v st
682657
}
683658

684659
if v, ok := b.diidCache.Get(fmt.Sprintf("%d", v.DIID)); ok {
685-
txack.DownlinkId = v.([]byte)
660+
pl := v.(gw.DownlinkFrame)
661+
txack.DownlinkId = pl.DownlinkId
662+
663+
if conn, err := b.gateways.get(gatewayID); err == nil {
664+
conn.stats.CountDownlink(&pl, &txack)
665+
}
686666
}
687667

688668
var downID uuid.UUID
@@ -717,6 +697,11 @@ func (b *Backend) handleUplinkDataFrame(gatewayID lorawan.EUI64, v structs.Uplin
717697
}
718698
uplinkFrame.RxInfo.UplinkId = uplinkID[:]
719699

700+
// count metrics
701+
if conn, err := b.gateways.get(gatewayID); err == nil {
702+
conn.stats.CountUplink(&uplinkFrame)
703+
}
704+
720705
log.WithFields(log.Fields{
721706
"gateway_id": gatewayID,
722707
"uplink_id": uplinkID,
@@ -835,7 +820,7 @@ func (b *Backend) websocketWrap(handler func(*http.Request, *connection), w http
835820

836821
// Wrap the conn inside a gateway struct, so that we can lock it when writing
837822
// data.
838-
c := connection{conn: conn}
823+
c := connection{conn: conn, stats: stats.NewCollector()}
839824

840825
go func() {
841826
for {
@@ -858,31 +843,3 @@ func (b *Backend) websocketWrap(handler func(*http.Request, *connection), w http
858843
handler(r, &c)
859844
done <- struct{}{}
860845
}
861-
862-
func (b *Backend) incrementRxStats(id lorawan.EUI64) {
863-
idStr := id.String()
864-
865-
if _, err := b.statsCache.IncrementUint32(idStr+":rx", uint32(1)); err != nil {
866-
b.statsCache.SetDefault(idStr+":rx", uint32(1))
867-
}
868-
869-
if _, err := b.statsCache.IncrementUint32(idStr+":rxOK", uint32(1)); err != nil {
870-
b.statsCache.SetDefault(idStr+":rxOK", uint32(1))
871-
}
872-
}
873-
874-
func (b *Backend) incrementTxOkStats(id lorawan.EUI64) {
875-
idStr := id.String()
876-
877-
if _, err := b.statsCache.IncrementUint32(idStr+"txOK", uint32(1)); err != nil {
878-
b.statsCache.SetDefault(idStr+":txOK", uint32(1))
879-
}
880-
}
881-
882-
func (b *Backend) incrementTxStats(id lorawan.EUI64) {
883-
idStr := id.String()
884-
885-
if _, err := b.statsCache.IncrementUint32(idStr+"tx", uint32(1)); err != nil {
886-
b.statsCache.SetDefault(idStr+":tx", uint32(1))
887-
}
888-
}

0 commit comments

Comments
 (0)