Skip to content

Commit cb8f1c3

Browse files
committed
Tests, fixes, docs
1 parent efef5cc commit cb8f1c3

File tree

4 files changed

+224
-9
lines changed

4 files changed

+224
-9
lines changed

README.md

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Promise wrapper over MQTT.js
44

5-
**IMPORANT: Make sure you handle rejections from returned promises because otherwise you might not see them**
5+
**IMPORANT: Make sure you handle rejections from returned promises because they won't crash the process**
66

77
## API
88

@@ -12,3 +12,46 @@ The API is the same as [MQTT.js](https://github.com/mqttjs/MQTT.js#api), except
1212
- subscribe
1313
- unsubscribe
1414
- end
15+
16+
17+
## Example
18+
19+
```javascript
20+
var MQTT = require("async-mqtt");
21+
22+
var client = MQTT.connect("tcp://somehost.com:1883");
23+
24+
// WHen passing async functions as event listeners, make sure to have a try catch block
25+
client.on("connect", doStuff);
26+
27+
async function doStuff() {
28+
29+
console.log("Starting");
30+
try {
31+
await client.publish("wow/so/cool", "It works!");
32+
// This line doesn't run until the server responds to the publish
33+
await client.end();
34+
// This line doesn't run until the client has disconnected without error
35+
console.log("Done");
36+
} catch (e){
37+
// Do something about it!
38+
console.log(e.stack);
39+
process.exit();
40+
}
41+
}
42+
```
43+
44+
## Wrapping existing client
45+
46+
```javascript
47+
var AsyncClient = require("async-mqtt").AsyncClient;
48+
49+
var client = getRegularMQTTClientFromSomewhere();
50+
51+
var asyncClient = new AsyncClient(client);
52+
53+
asyncClient.publish("foo/bar", "baz").then(function(){
54+
console.log("We async now");
55+
return asyncClient.end();
56+
});
57+
```

index.js

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,27 @@
22
var mqtt = require("mqtt");
33
var inArray = require("in-array");
44

5-
var RegularClientPrototype = mqtt.client;
5+
var RegularClientPrototype = mqtt.MqttClient.prototype;
66

7-
var ASYNC_METHODS = ["publish", "subscribe", "unsubscribe", "unsubscribe", "end"];
7+
var ASYNC_METHODS = ["publish",
8+
"subscribe",
9+
"unsubscribe",
10+
"unsubscribe",
11+
"end"
12+
];
13+
14+
var SYNC_METHODS = [
15+
"emit",
16+
"addListener",
17+
"on",
18+
"once",
19+
"removeListener",
20+
"removeAllListeners",
21+
"setMaxListeners",
22+
"getMaxListeners",
23+
"listeners",
24+
"listenerCount"
25+
];
826

927
module.exports = {
1028
connect: connect,
@@ -32,11 +50,8 @@ AsyncClient.prototype = {
3250
}
3351
};
3452

35-
for (var name in RegularClientPrototype) {
36-
if (inArray(ASYNC_METHODS, name))
37-
defineAsync(name);
38-
else definePassthrough(name);
39-
}
53+
ASYNC_METHODS.forEach(defineAsync);
54+
SYNC_METHODS.forEach(definePassthrough);
4055

4156
function definePassthrough(name) {
4257
AsyncClient.prototype[name] = function() {
@@ -56,7 +71,7 @@ function defineAsync(name) {
5671

5772
return new Promise(function(resolve, reject) {
5873
args.push(makeCallback(resolve, reject));
59-
client.apply(client, args);
74+
client[name].apply(client, args);
6075
});
6176
};
6277
}

package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,9 @@
2626
"dependencies": {
2727
"in-array": "^0.1.2",
2828
"mqtt": "^2.3.1"
29+
},
30+
"devDependencies": {
31+
"mqtt-connection": "^3.0.0",
32+
"tape": "^4.6.3"
2933
}
3034
}

test.js

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
"use strict";
2+
3+
var Server = require("mqtt/test/server");
4+
5+
var AsyncMQTT = require("./");
6+
var AsyncClient = AsyncMQTT.AsyncClient;
7+
8+
var test = require("tape");
9+
10+
var SERVER_PORT = 1883;
11+
var SERVER_URL = "mqtt://localhost:" + SERVER_PORT;
12+
13+
var server = buildServer().listen(SERVER_PORT);
14+
server.unref();
15+
16+
server.on("listening", runTests);
17+
18+
function runTests() {
19+
test("Connect should return an instance of AsyncClient", function (t) {
20+
t.plan(1);
21+
var client = AsyncMQTT.connect(SERVER_URL);
22+
23+
t.ok(client instanceof AsyncClient, "Connect returned an AsyncClient");
24+
25+
client.end();
26+
});
27+
28+
test("Should be able to listen on event on client", function (t) {
29+
t.plan(1);
30+
31+
var client = AsyncMQTT.connect(SERVER_URL);
32+
33+
client.once("connect", function () {
34+
t.pass("Connected");
35+
client.end();
36+
});
37+
});
38+
39+
test("Calling end() should resolve once disconnected", function (t) {
40+
t.plan(2);
41+
42+
var client = AsyncMQTT.connect(SERVER_URL);
43+
44+
client.on("close", function () {
45+
t.pass("Close event occured");
46+
});
47+
48+
client.on("connect", function () {
49+
// Wait for connect to emit before ending
50+
client.end().then(function(){
51+
t.pass("End resolved");
52+
});
53+
});
54+
});
55+
56+
test("Calling subscribe should resolve once subscribed", function (t) {
57+
t.plan(1);
58+
59+
var client = AsyncMQTT.connect(SERVER_URL);
60+
61+
client.subscribe("example", {
62+
qos: 1
63+
}).then(function(){
64+
t.pass("Subscribed");
65+
client.end();
66+
})
67+
});
68+
69+
test("Calling unsubscribe should resolve once completed", function(t){
70+
t.plan(1);
71+
72+
var client = AsyncMQTT.connect(SERVER_URL);
73+
74+
client.subscribe("example", {
75+
qos: 1
76+
}).then(function(){
77+
return client.unsubscribe("example");
78+
}).then(function(){
79+
t.pass("Unsunbscribed");
80+
return client.end();
81+
});
82+
});
83+
84+
test("Calling publish should resolve once completed", function (t) {
85+
t.plan(1);
86+
87+
var client = AsyncMQTT.connect(SERVER_URL);
88+
89+
client.publish("example", "test", {
90+
qos: 1
91+
}).then(function(){
92+
t.pass("Published");
93+
return client.end();
94+
});
95+
});
96+
}
97+
98+
// Taken from MQTT.js tests
99+
function buildServer () {
100+
return new Server(function (client) {
101+
client.on('connect', function (packet) {
102+
if (packet.clientId === 'invalid') {
103+
client.connack({returnCode: 2})
104+
} else {
105+
client.connack({returnCode: 0})
106+
}
107+
})
108+
109+
client.on('publish', function (packet) {
110+
setImmediate(function () {
111+
switch (packet.qos) {
112+
case 0:
113+
break
114+
case 1:
115+
client.puback(packet)
116+
break
117+
case 2:
118+
client.pubrec(packet)
119+
break
120+
}
121+
})
122+
})
123+
124+
client.on('pubrel', function (packet) {
125+
client.pubcomp(packet)
126+
})
127+
128+
client.on('pubrec', function (packet) {
129+
client.pubrel(packet)
130+
})
131+
132+
client.on('pubcomp', function () {
133+
// Nothing to be done
134+
})
135+
136+
client.on('subscribe', function (packet) {
137+
client.suback({
138+
messageId: packet.messageId,
139+
granted: packet.subscriptions.map(function (e) {
140+
return e.qos
141+
})
142+
})
143+
})
144+
145+
client.on('unsubscribe', function (packet) {
146+
client.unsuback(packet)
147+
})
148+
149+
client.on('pingreq', function () {
150+
client.pingresp()
151+
})
152+
})
153+
}

0 commit comments

Comments
 (0)