Skip to content
This repository was archived by the owner on May 2, 2025. It is now read-only.

Commit 651b572

Browse files
committed
Use Redis Stream ID for de-duplication of device events.
The Redis Stream is incremented for each event (by Redis). Using the PublishedAt timestamp has the risk of race-conditions as the timestamp is set on constructing the message, not on publishing it. Therefore it is possible that the latest message has a timestamp before the message that was published before, in which case the web-interface would ignore it.
1 parent 4db1825 commit 651b572

File tree

6 files changed

+25
-7
lines changed

6 files changed

+25
-7
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.16
55
require (
66
github.com/NickBall/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5
77
github.com/aws/aws-sdk-go v1.26.3
8-
github.com/brocaar/chirpstack-api/go/v3 v3.11.1
8+
github.com/brocaar/chirpstack-api/go/v3 v3.12.3
99
github.com/brocaar/lorawan v0.0.0-20210809075358-95fc1667572e
1010
github.com/coreos/go-oidc v2.2.1+incompatible
1111
github.com/eclipse/paho.mqtt.golang v1.3.1

go.sum

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwj
7272
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2 h1:oMCHnXa6CCCafdPDbMh/lWRhRByN0VFLvv+g+ayx1SI=
7373
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2/go.mod h1:PkYb9DJNAwrSvRx5DYA+gUcOIgTGVMNkfSCbZM8cWpI=
7474
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
75-
github.com/brocaar/chirpstack-api/go/v3 v3.11.1 h1:/CpPFxvaNcF0yEE+Y0t2BJF529sciIMTsK/Wx565Z7c=
76-
github.com/brocaar/chirpstack-api/go/v3 v3.11.1/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00=
75+
github.com/brocaar/chirpstack-api/go/v3 v3.12.3 h1:/sj8cIpoWrlJWwWznF2lwOxPefLamnKPyHglAFfll6s=
76+
github.com/brocaar/chirpstack-api/go/v3 v3.12.3/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00=
7777
github.com/brocaar/lorawan v0.0.0-20210809075358-95fc1667572e h1:htxGGoTtAoy4p3qnq42qb0GfupCLe2AXJkSqzLYEPnA=
7878
github.com/brocaar/lorawan v0.0.0-20210809075358-95fc1667572e/go.mod h1:Vlf3gOwizqX4y3snWe/i2EqRT83HvYuwBjRu39PevW0=
7979
github.com/caarlos0/ctrlc v1.0.0 h1:2DtF8GSIcajgffDFJzyG15vO+1PuBWOMUdFut7NnXhw=
@@ -351,7 +351,6 @@ github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq
351351
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
352352
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
353353
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
354-
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
355354
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
356355
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=
357356
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=

internal/api/external/device.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,7 @@ func (a *DeviceAPI) StreamEventLogs(req *pb.StreamDeviceEventLogsRequest, srv pb
848848
PublishedAt: el.PublishedAt,
849849
Type: el.Type,
850850
PayloadJson: string(b),
851+
StreamId: el.StreamID,
851852
}
852853

853854
err = srv.Send(&resp)

internal/eventlog/eventlog.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type EventLog struct {
4444
Type string
4545
PublishedAt *timestamp.Timestamp
4646
Payload json.RawMessage
47+
StreamID string
4748
}
4849

4950
// LogEventForDevice logs an event for the given device.
@@ -152,6 +153,7 @@ func GetEventLogForDevice(ctx context.Context, devEUI lorawan.EUI64, eventsChan
152153
Type: event,
153154
Payload: json.RawMessage(jsonB),
154155
PublishedAt: pl.GetPublishedAt(),
156+
StreamID: msg.ID,
155157
}
156158
}
157159
}

internal/integration/logger/logger_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ func (ts *LoggerTestSuite) TestHandleUplinkEvent() {
6161
}
6262
assert.NoError(ts.integration.HandleUplinkEvent(context.Background(), nil, nil, pl))
6363
el := <-ts.logChannel
64+
assert.NotEqual("", el.StreamID)
65+
el.StreamID = ""
6466
assert.Equal(toEventLog("up", &pl), el)
6567
}
6668

@@ -71,6 +73,8 @@ func (ts *LoggerTestSuite) TestJoinEvent() {
7173
}
7274
assert.NoError(ts.integration.HandleJoinEvent(context.Background(), nil, nil, pl))
7375
el := <-ts.logChannel
76+
assert.NotEqual("", el.StreamID)
77+
el.StreamID = ""
7478
assert.Equal(toEventLog("join", &pl), el)
7579
}
7680

@@ -81,6 +85,8 @@ func (ts *LoggerTestSuite) TestAckEvent() {
8185
}
8286
assert.NoError(ts.integration.HandleAckEvent(context.Background(), nil, nil, pl))
8387
el := <-ts.logChannel
88+
assert.NotEqual("", el.StreamID)
89+
el.StreamID = ""
8490
assert.Equal(toEventLog("ack", &pl), el)
8591
}
8692

@@ -91,6 +97,8 @@ func (ts *LoggerTestSuite) TestErrorEvent() {
9197
}
9298
assert.NoError(ts.integration.HandleErrorEvent(context.Background(), nil, nil, pl))
9399
el := <-ts.logChannel
100+
assert.NotEqual("", el.StreamID)
101+
el.StreamID = ""
94102
assert.Equal(toEventLog("error", &pl), el)
95103
}
96104

@@ -101,6 +109,8 @@ func (ts *LoggerTestSuite) TestStatusEvent() {
101109
}
102110
assert.NoError(ts.integration.HandleStatusEvent(context.Background(), nil, nil, pl))
103111
el := <-ts.logChannel
112+
assert.NotEqual("", el.StreamID)
113+
el.StreamID = ""
104114
assert.Equal(toEventLog("status", &pl), el)
105115
}
106116

@@ -111,6 +121,8 @@ func (ts *LoggerTestSuite) TestLocationEvent() {
111121
}
112122
assert.NoError(ts.integration.HandleLocationEvent(context.Background(), nil, nil, pl))
113123
el := <-ts.logChannel
124+
assert.NotEqual("", el.StreamID)
125+
el.StreamID = ""
114126
assert.Equal(toEventLog("location", &pl), el)
115127
}
116128

@@ -121,6 +133,8 @@ func (ts *LoggerTestSuite) TestTxAckEvent() {
121133
}
122134
assert.NoError(ts.integration.HandleTxAckEvent(context.Background(), nil, nil, pl))
123135
el := <-ts.logChannel
136+
assert.NotEqual("", el.StreamID)
137+
el.StreamID = ""
124138
assert.Equal(toEventLog("txack", &pl), el)
125139
}
126140

@@ -131,6 +145,8 @@ func (ts *LoggerTestSuite) TestIntegrationEvent() {
131145
}
132146
assert.NoError(ts.integration.HandleIntegrationEvent(context.Background(), nil, nil, pl))
133147
el := <-ts.logChannel
148+
assert.NotEqual("", el.StreamID)
149+
el.StreamID = ""
134150
assert.Equal(toEventLog("integration", &pl), el)
135151
}
136152

ui/src/views/devices/DeviceData.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,11 @@ class DeviceData extends Component {
224224
}
225225

226226
let data = this.state.data;
227-
const now = new Date();
228227

229-
if (data.length === 0 || moment(d.publishedAt).isAfter(data[0].publishedAt)) {
228+
// only append when stream id > last item.
229+
if (data.length === 0 || parseInt(d.streamID.replace("-", "")) > parseInt(data[0].id.replace("-", ""))) {
230230
data.unshift({
231-
id: now.getTime(),
231+
id: d.streamID,
232232
publishedAt: d.publishedAt,
233233
type: d.type,
234234
payload: JSON.parse(d.payloadJSON),

0 commit comments

Comments
 (0)