Skip to content

Commit 520a40f

Browse files
antontroshinyaron2
andauthored
fix(mqtt): ensure proper locking when subscribing and unsubscribing topics (#3864)
Signed-off-by: Anton Troshin <[email protected]> Co-authored-by: Yaron Schneider <[email protected]>
1 parent a487870 commit 520a40f

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

pubsub/mqtt3/mqtt.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type mqttPubSub struct {
4646
logger logger.Logger
4747
topics map[string]mqttPubSubSubscription
4848
subscribingLock sync.RWMutex
49+
deletionLock sync.RWMutex
4950
reconnectCh chan struct{}
5051
closeCh chan struct{}
5152
closed atomic.Bool
@@ -143,11 +144,11 @@ func (m *mqttPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest,
143144

144145
m.subscribingLock.Lock()
145146
defer m.subscribingLock.Unlock()
146-
147147
// Add the topic then start the subscription
148148
m.addTopic(topic, handler)
149149

150150
token := m.conn.Subscribe(topic, m.metadata.Qos, m.onMessage(ctx))
151+
151152
var err error
152153
select {
153154
case <-token.Done():
@@ -180,7 +181,9 @@ func (m *mqttPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest,
180181
}
181182

182183
// Delete the topic from the map first, which stops routing messages to handlers
184+
m.deletionLock.Lock()
183185
delete(m.topics, topic)
186+
m.deletionLock.Unlock()
184187

185188
// We will call Unsubscribe only if cleanSession is true or if "unsubscribeOnClose" in the request metadata is true
186189
// Otherwise, calling this will make the broker lose the position of our subscription, which is not what we want if we are going to reconnect later

0 commit comments

Comments
 (0)