Skip to content

Commit 47536b7

Browse files
authored
Fix QoS 1 message delivery after server restart (#427)
Resolved an issue where persisted QoS 1 messages were not correctly loaded into the appropriate client instance during server startup.
1 parent 830de14 commit 47536b7

File tree

7 files changed

+14
-5
lines changed

7 files changed

+14
-5
lines changed

hooks/storage/badger/badger.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
292292
TopicName: pk.TopicName,
293293
Payload: pk.Payload,
294294
Created: pk.Created,
295+
Client: cl.ID,
295296
Origin: pk.Origin,
296297
Properties: storage.MessageProperties{
297298
PayloadFormat: props.PayloadFormat,
@@ -319,6 +320,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
319320
in := &storage.Message{
320321
ID: inflightKey(cl, pk),
321322
T: storage.InflightKey,
323+
Client: cl.ID,
322324
Origin: pk.Origin,
323325
PacketID: pk.PacketID,
324326
FixedHeader: pk.FixedHeader,

hooks/storage/bolt/bolt.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
260260
TopicName: pk.TopicName,
261261
Payload: pk.Payload,
262262
Created: pk.Created,
263+
Client: cl.ID,
263264
Origin: pk.Origin,
264265
Properties: storage.MessageProperties{
265266
PayloadFormat: props.PayloadFormat,
@@ -287,6 +288,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
287288
in := &storage.Message{
288289
ID: inflightKey(cl, pk),
289290
T: storage.InflightKey,
291+
Client: cl.ID,
290292
Origin: pk.Origin,
291293
FixedHeader: pk.FixedHeader,
292294
TopicName: pk.TopicName,

hooks/storage/pebble/pebble.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
268268
TopicName: pk.TopicName,
269269
Payload: pk.Payload,
270270
Created: pk.Created,
271+
Client: cl.ID,
271272
Origin: pk.Origin,
272273
Properties: storage.MessageProperties{
273274
PayloadFormat: props.PayloadFormat,
@@ -295,6 +296,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
295296
in := &storage.Message{
296297
ID: inflightKey(cl, pk),
297298
T: storage.InflightKey,
299+
Client: cl.ID,
298300
Origin: pk.Origin,
299301
PacketID: pk.PacketID,
300302
FixedHeader: pk.FixedHeader,

hooks/storage/redis/redis.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
287287
TopicName: pk.TopicName,
288288
Payload: pk.Payload,
289289
Created: pk.Created,
290+
Client: cl.ID,
290291
Origin: pk.Origin,
291292
Properties: storage.MessageProperties{
292293
PayloadFormat: props.PayloadFormat,
@@ -317,6 +318,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
317318
in := &storage.Message{
318319
ID: inflightKey(cl, pk),
319320
T: storage.InflightKey,
321+
Client: cl.ID,
320322
Origin: pk.Origin,
321323
FixedHeader: pk.FixedHeader,
322324
TopicName: pk.TopicName,

hooks/storage/storage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ type Message struct {
8989
Payload []byte `json:"payload"` // the message payload (if retained)
9090
T string `json:"t,omitempty"` // the data type
9191
ID string `json:"id,omitempty" storm:"id"` // the storage key
92+
Client string `json:"client,omitempty"` // the client id the message is for
9293
Origin string `json:"origin,omitempty"` // the id of the client who sent the message
9394
TopicName string `json:"topic_name,omitempty"` // the topic the message was sent to (if retained)
9495
FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message

server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1672,7 +1672,7 @@ func (s *Server) loadClients(v []storage.Client) {
16721672
// loadInflight restores inflight messages from the datastore.
16731673
func (s *Server) loadInflight(v []storage.Message) {
16741674
for _, msg := range v {
1675-
if client, ok := s.Clients.Get(msg.Origin); ok {
1675+
if client, ok := s.Clients.Get(msg.Client); ok {
16761676
client.State.Inflight.Set(msg.ToPacket())
16771677
}
16781678
}

server_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3416,10 +3416,10 @@ func TestServerLoadInflightMessages(t *testing.T) {
34163416
require.Equal(t, 3, s.Clients.Len())
34173417

34183418
v := []storage.Message{
3419-
{Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
3420-
{Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
3421-
{Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
3422-
{Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
3419+
{Client: "mochi", Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
3420+
{Client: "mochi", Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
3421+
{Client: "zen", Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
3422+
{Client: "mochi-co", Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
34233423
}
34243424
s.loadInflight(v)
34253425

0 commit comments

Comments
 (0)