Skip to content

Commit c8aa654

Browse files
robertsLandoynagasakidmitry-kurmanovvishnureddy17bverhoeven
authored
feat: option to disable writeCache and fix leak in subscriptions (#1622)
@vishnureddy17 Fix typo in README 00bf657 @bverhoeven Fix memory leak in subscription topic mapping (#1535) 8c77eec @mwohlert @robertsLando @ynagasaki feat: allow user to disable pre-generated write cache (#1151) 0d11888 Co-authored-by: Daniel Lando <[email protected]> Co-authored-by: Yoshi Nagasaki <[email protected]> Co-authored-by: Dmitry Kurmanov <[email protected]> Co-authored-by: Vishnu Reddy <[email protected]> Co-authored-by: Bas Verhoeven <[email protected]> Co-authored-by: Michel Wohlert <[email protected]>
1 parent dcb24de commit c8aa654

File tree

4 files changed

+65
-6
lines changed

4 files changed

+65
-6
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ to use MQTT.js in the browser see the [browserify](#browserify) section
135135
### CommonJS (Require)
136136

137137
```js
138-
const mqtt = require("mqtt"); // require mqtt
139-
const client = mqtt.connect("est.mosquitto.org"); // create a client
138+
const mqtt = require("mqtt") // require mqtt
139+
const client = mqtt.connect("test.mosquitto.org") // create a client
140140
```
141141

142142
### ES6 Modules (Import)

lib/client.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ const defaultConnectOptions = {
2929
reconnectPeriod: 1000,
3030
connectTimeout: 30 * 1000,
3131
clean: true,
32-
resubscribe: true
32+
resubscribe: true,
33+
writeCache: true
3334
}
3435

3536
const socketErrors = [
@@ -278,6 +279,11 @@ function MqttClient (streamBuilder, options) {
278279

279280
this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
280281

282+
// Disable pre-generated write cache if requested. Will allocate buffers on-the-fly instead. WARNING: This can affect write performance
283+
if (!this.options.writeCache) {
284+
mqttPacket.writeToStream.cacheNumbers = false
285+
}
286+
281287
this.streamBuilder = streamBuilder
282288

283289
this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider
@@ -1662,6 +1668,7 @@ MqttClient.prototype._handleAck = function (packet) {
16621668
}
16631669
}
16641670
}
1671+
delete this.messageIdToTopic[messageId]
16651672
this._invokeStoreProcessingQueue()
16661673
cb(null, packet)
16671674
break

test/abstract_client.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3274,4 +3274,43 @@ module.exports = function (server, config) {
32743274
})
32753275
})
32763276
})
3277+
3278+
describe('message id to subscription topic mapping', () => {
3279+
it('should not create a mapping if resubscribe is disabled', function (done) {
3280+
const client = connect({ resubscribe: false })
3281+
client.subscribe('test1')
3282+
client.subscribe('test2')
3283+
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 0)
3284+
client.end(true, done)
3285+
})
3286+
3287+
it('should create a mapping for each subscribe call', function (done) {
3288+
const client = connect()
3289+
client.subscribe('test1')
3290+
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 1)
3291+
client.subscribe('test2')
3292+
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 2)
3293+
3294+
client.subscribe(['test3', 'test4'])
3295+
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 3)
3296+
client.subscribe(['test5', 'test6'])
3297+
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 4)
3298+
3299+
client.end(true, done)
3300+
})
3301+
3302+
it('should remove the mapping after suback', function (done) {
3303+
const client = connect()
3304+
client.once('connect', function () {
3305+
client.subscribe('test1', { qos: 2 }, function () {
3306+
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 0)
3307+
3308+
client.subscribe(['test2', 'test3'], { qos: 2 }, function () {
3309+
assert.strictEqual(Object.keys(client.messageIdToTopic).length, 0)
3310+
client.end(done)
3311+
})
3312+
})
3313+
})
3314+
})
3315+
})
32773316
}

test/client.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,19 @@ describe('MqttClient', function () {
4343
done()
4444
}
4545
})
46+
47+
it('should disable number cache if specified in options', function (done) {
48+
try {
49+
assert.isTrue(mqttPacket.writeToStream.cacheNumbers)
50+
client = mqtt.MqttClient(function () {
51+
throw Error('break')
52+
}, { writeCache: false })
53+
client.end()
54+
} catch (err) {
55+
assert.isFalse(mqttPacket.writeToStream.cacheNumbers)
56+
done()
57+
}
58+
})
4659
})
4760

4861
describe('message ids', function () {
@@ -83,7 +96,7 @@ describe('MqttClient', function () {
8396
const max = 1000
8497
let count = 0
8598
const duplex = new Duplex({
86-
read: function (n) {},
99+
read: function (n) { },
87100
write: function (chunk, enc, cb) {
88101
parser.parse(chunk)
89102
cb() // nothing to do
@@ -300,7 +313,7 @@ describe('MqttClient', function () {
300313
})
301314

302315
const server2 = new MqttServer(function (serverClient) {
303-
serverClient.on('error', function () {})
316+
serverClient.on('error', function () { })
304317
debug('setting serverClient connect callback')
305318
serverClient.on('connect', function (packet) {
306319
if (packet.clientId === 'invalid') {
@@ -397,7 +410,7 @@ describe('MqttClient', function () {
397410

398411
const server2 = net.createServer(function (stream) {
399412
const serverClient = new Connection(stream)
400-
serverClient.on('error', function () {})
413+
serverClient.on('error', function () { })
401414
serverClient.on('connect', function (packet) {
402415
if (packet.clientId === 'invalid') {
403416
serverClient.connack({ returnCode: 2 })

0 commit comments

Comments
 (0)