Skip to content

Commit 5bd814c

Browse files
jmacwhytedeadprogram
authored andcommitted
MQTT: adds keepalive pinging, disconnect, and graceful goroutine cleanup
1 parent 41b7f06 commit 5bd814c

File tree

2 files changed

+134
-31
lines changed

2 files changed

+134
-31
lines changed

net/mqtt/mqtt.go

Lines changed: 116 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package mqtt
55
import (
66
"errors"
77
"strings"
8+
"sync"
89
"time"
910

1011
"github.com/eclipse/paho.mqtt.golang/packets"
@@ -19,20 +20,32 @@ import (
1920
func NewClient(o *ClientOptions) Client {
2021
c := &mqttclient{opts: o, adaptor: o.Adaptor}
2122
c.msgRouter, c.stopRouter = newRouter()
23+
24+
c.inboundPacketChan = make(chan packets.ControlPacket, 10)
25+
c.stopInbound = make(chan struct{})
26+
c.incomingPubChan = make(chan *packets.PublishPacket, 10)
27+
// this launches a goroutine, so only call once per client:
28+
c.msgRouter.matchAndDispatch(c.incomingPubChan, c.opts.Order, c)
2229
return c
2330
}
2431

2532
type mqttclient struct {
26-
adaptor net.Adapter
27-
conn net.Conn
28-
connected bool
29-
opts *ClientOptions
30-
mid uint16
31-
inbound chan packets.ControlPacket
32-
stop chan struct{}
33-
msgRouter *router
34-
stopRouter chan bool
35-
incomingPubChan chan *packets.PublishPacket
33+
adaptor net.Adapter
34+
conn net.Conn
35+
connected bool
36+
opts *ClientOptions
37+
mid uint16
38+
inboundPacketChan chan packets.ControlPacket
39+
stopInbound chan struct{}
40+
msgRouter *router
41+
stopRouter chan bool
42+
incomingPubChan chan *packets.PublishPacket
43+
// stats for keepalive
44+
lastReceive time.Time
45+
lastSend time.Time
46+
// keep track of routines and signal a shutdown
47+
workers sync.WaitGroup
48+
shutdown bool
3649
}
3750

3851
// AddRoute allows you to add a handler for messages on a specific topic
@@ -56,6 +69,9 @@ func (c *mqttclient) IsConnectionOpen() bool {
5669

5770
// Connect will create a connection to the message broker.
5871
func (c *mqttclient) Connect() Token {
72+
if c.IsConnected() {
73+
return &mqtttoken{}
74+
}
5975
var err error
6076

6177
// make connection
@@ -77,10 +93,6 @@ func (c *mqttclient) Connect() Token {
7793
}
7894

7995
c.mid = 1
80-
c.inbound = make(chan packets.ControlPacket, 10)
81-
c.stop = make(chan struct{})
82-
c.incomingPubChan = make(chan *packets.PublishPacket, 10)
83-
c.msgRouter.matchAndDispatch(c.incomingPubChan, c.opts.Order, c)
8496

8597
// send the MQTT connect message
8698
connectPkt := packets.NewControlPacket(packets.Connect).(*packets.ConnectPacket)
@@ -98,7 +110,7 @@ func (c *mqttclient) Connect() Token {
98110
connectPkt.ClientIdentifier = c.opts.ClientID
99111
connectPkt.ProtocolVersion = byte(c.opts.ProtocolVersion)
100112
connectPkt.ProtocolName = "MQTT"
101-
connectPkt.Keepalive = 60
113+
connectPkt.Keepalive = uint16(c.opts.KeepAlive)
102114

103115
connectPkt.WillFlag = c.opts.WillEnabled
104116
connectPkt.WillTopic = c.opts.WillTopic
@@ -110,6 +122,7 @@ func (c *mqttclient) Connect() Token {
110122
if err != nil {
111123
return &mqtttoken{err: err}
112124
}
125+
c.lastSend = time.Now()
113126

114127
// TODO: handle timeout as ReadPacket blocks until it gets a packet.
115128
// CONNECT response.
@@ -127,20 +140,36 @@ func (c *mqttclient) Connect() Token {
127140
}
128141
}
129142

130-
go readMessages(c)
131143
go processInbound(c)
144+
go readMessages(c)
145+
go keepAlive(c)
132146

133147
return &mqtttoken{}
134148
}
135149

136150
// Disconnect will end the connection with the server, but not before waiting
137151
// the specified number of milliseconds to wait for existing work to be
138-
// completed.
152+
// completed. Blocks until disconnected.
139153
func (c *mqttclient) Disconnect(quiesce uint) {
140-
c.conn.Close()
154+
c.shutdownRoutines()
155+
// block until all done
156+
for c.connected {
157+
time.Sleep(time.Millisecond * 10)
158+
}
141159
return
142160
}
143161

162+
// shutdownRoutines will disconnect and shut down all processes. If you want to trigger a
163+
// disconnect internally, make sure you call this instead of Disconnect() to avoid deadlocks
164+
func (c *mqttclient) shutdownRoutines() {
165+
if c.shutdown {
166+
return
167+
}
168+
c.shutdown = true
169+
c.conn.Close()
170+
c.stopInbound <- struct{}{}
171+
}
172+
144173
// Publish will publish a message with the specified QoS and content
145174
// to the specified topic.
146175
// Returns a token to track delivery of the message to the broker
@@ -153,6 +182,7 @@ func (c *mqttclient) Publish(topic string, qos byte, retained bool, payload inte
153182
pub.Qos = qos
154183
pub.TopicName = topic
155184
pub.Retain = retained
185+
156186
switch payload.(type) {
157187
case string:
158188
pub.Payload = []byte(payload.(string))
@@ -168,6 +198,8 @@ func (c *mqttclient) Publish(topic string, qos byte, retained bool, payload inte
168198
if err != nil {
169199
return &mqtttoken{err: err}
170200
}
201+
// update this for every control message that is sent successfully, for keepalive
202+
c.lastSend = time.Now()
171203

172204
return &mqtttoken{}
173205
}
@@ -195,6 +227,7 @@ func (c *mqttclient) Subscribe(topic string, qos byte, callback MessageHandler)
195227
if err != nil {
196228
return &mqtttoken{err: err}
197229
}
230+
c.lastSend = time.Now()
198231

199232
return &mqtttoken{}
200233
}
@@ -220,12 +253,13 @@ func (c *mqttclient) OptionsReader() ClientOptionsReader {
220253
}
221254

222255
func processInbound(c *mqttclient) {
256+
PROCESS:
223257
for {
224258
select {
225-
case msg := <-c.inbound:
259+
case msg := <-c.inboundPacketChan:
226260
switch m := msg.(type) {
227261
case *packets.PingrespPacket:
228-
// TODO: handle this
262+
// println("pong")
229263
case *packets.SubackPacket:
230264
// TODO: handle this
231265
case *packets.UnsubackPacket:
@@ -242,33 +276,85 @@ func processInbound(c *mqttclient) {
242276
case *packets.PubcompPacket:
243277
// TODO: handle this
244278
}
245-
case <-c.stop:
246-
return
279+
case <-c.stopInbound:
280+
break PROCESS
247281
}
248282
}
283+
284+
// as this routine could be the last to finish (if a lot of messages are queued in the
285+
// channel), it is the last to turn out the lights
286+
287+
c.workers.Wait()
288+
c.connected = false
289+
c.shutdown = false
249290
}
250291

251292
// readMessages reads incoming messages off the wire.
252-
// incoming messages are then send into inbound channel.
293+
// incoming messages are then send into inbound buffered channel.
253294
func readMessages(c *mqttclient) {
295+
c.workers.Add(1)
296+
defer c.workers.Done()
297+
254298
var err error
255299
var cp packets.ControlPacket
256300

257-
PROCESS:
258-
for {
301+
for !c.shutdown {
259302
if cp, err = c.ReadPacket(); err != nil {
260-
break PROCESS
303+
c.shutdownRoutines()
304+
return
261305
}
262306
if cp != nil {
263-
c.inbound <- cp
264-
// TODO: Notify keepalive logic that we recently received a packet
307+
c.inboundPacketChan <- cp
308+
// notify keepalive logic that we recently received a packet
309+
c.lastReceive = time.Now()
265310
}
266311

267312
time.Sleep(100 * time.Millisecond)
268313
}
314+
}
269315

270-
// TODO: handle if we received an error on read.
271-
// If disconnect is in progress, swallow error and return
316+
// keepAlive is a goroutine to handle sending ping requests according to the MQTT spec. If the keepalive time has
317+
// been reached with no messages being sent, we will send a ping request and check back to see if we've
318+
// had any activity by the timeout. If not, disconnect.
319+
func keepAlive(c *mqttclient) {
320+
c.workers.Add(1)
321+
defer c.workers.Done()
322+
323+
var err error
324+
var ping *packets.PingreqPacket
325+
var timeout, pingsent time.Time
326+
327+
for !c.shutdown {
328+
// As long as we haven't reached the keepalive value...
329+
if time.Since(c.lastSend) < time.Duration(c.opts.KeepAlive)*time.Second {
330+
// ...sleep and check shutdown status again
331+
time.Sleep(time.Millisecond * 100)
332+
continue
333+
}
334+
335+
// value has been reached, so send a ping request
336+
ping = packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
337+
if err = ping.Write(c.conn); err != nil {
338+
// if connection is lost, report disconnect
339+
c.shutdownRoutines()
340+
return
341+
}
342+
// println("ping")
343+
344+
c.lastSend = time.Now()
345+
pingsent = time.Now()
346+
timeout = pingsent.Add(c.opts.PingTimeout)
347+
348+
// as long as we are still connected and haven't received anything after the ping...
349+
for !c.shutdown && c.lastReceive.Before(pingsent) {
350+
// if the timeout has passed, disconnect
351+
if time.Now().After(timeout) {
352+
c.shutdownRoutines()
353+
return
354+
}
355+
time.Sleep(time.Millisecond * 100)
356+
}
357+
}
272358
}
273359

274360
func (c *mqttclient) ackFunc(packet *packets.PublishPacket) func() {

net/mqtt/paho.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ type ClientOptions struct {
210210

211211
// NewClientOptions returns a new ClientOptions struct.
212212
func NewClientOptions() *ClientOptions {
213-
return &ClientOptions{Adaptor: net.ActiveDevice, ProtocolVersion: 4}
213+
return &ClientOptions{Adaptor: net.ActiveDevice, ProtocolVersion: 4, KeepAlive: 60, PingTimeout: time.Second * 10}
214214
}
215215

216216
// AddBroker adds a broker URI to the list of brokers to be used. The format should be
@@ -257,6 +257,23 @@ func (o *ClientOptions) SetPassword(p string) *ClientOptions {
257257
return o
258258
}
259259

260+
// SetKeepAlive will set the amount of time (in seconds) that the client
261+
// should wait before sending a PING request to the broker. This will
262+
// allow the client to know that a connection has not been lost with the
263+
// server.
264+
func (o *ClientOptions) SetKeepAlive(k time.Duration) *ClientOptions {
265+
o.KeepAlive = int64(k / time.Second)
266+
return o
267+
}
268+
269+
// SetPingTimeout will set the amount of time (in seconds) that the client
270+
// will wait after sending a PING request to the broker, before deciding
271+
// that the connection has been lost. Default is 10 seconds.
272+
func (o *ClientOptions) SetPingTimeout(k time.Duration) *ClientOptions {
273+
o.PingTimeout = k
274+
return o
275+
}
276+
260277
// SetWill accepts a string will message to be set. When the client connects,
261278
// it will give this will message to the broker, which will then publish the
262279
// provided payload (the will) to any clients that are subscribed to the provided

0 commit comments

Comments
 (0)