Skip to content

Commit b937299

Browse files
authored
Merge pull request #21 from 123joshuawu/add-async-connect
Add connectAsync
2 parents fcc40b5 + f8ff21b commit b937299

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

index.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,5 +139,47 @@ module.exports = {
139139

140140
return asyncClient;
141141
},
142+
connectAsync (brokerURL, opts, allowRetries=true) {
143+
const client = mqtt.connect(brokerURL, opts);
144+
const asyncClient = new AsyncClient(client);
145+
146+
return new Promise((resolve, reject) => {
147+
// Listeners added to client to trigger promise resolution
148+
const promiseResolutionListeners = {
149+
connect: (connack) => {
150+
removePromiseResolutionListeners();
151+
resolve(asyncClient); // Resolve on connect
152+
},
153+
end: () => {
154+
removePromiseResolutionListeners();
155+
resolve(asyncClient); // Resolve on end
156+
},
157+
error: (err) => {
158+
removePromiseResolutionListeners();
159+
client.end();
160+
reject(err); // Reject on error
161+
}
162+
};
163+
164+
// If retries are not allowed, reject on close
165+
if (allowRetries === false) {
166+
promiseResolutionListeners.close = () => {
167+
promiseResolutionListeners.error("Couldn't connect to server");
168+
}
169+
}
170+
171+
// Remove listeners added to client by this promise
172+
const removePromiseResolutionListeners = () => {
173+
Object.keys(promiseResolutionListeners).forEach((eventName) => {
174+
client.off(eventName, promiseResolutionListeners[eventName]);
175+
});
176+
};
177+
178+
// Add listeners to client
179+
Object.keys(promiseResolutionListeners).forEach((eventName) => {
180+
client.on(eventName, promiseResolutionListeners[eventName]);
181+
});
182+
});
183+
},
142184
AsyncClient
143185
};

test.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,20 @@ function runTests () {
2727
})
2828
});
2929

30+
test('ConnectAsync should return AsyncClient after connection', t => {
31+
t.plan(1);
32+
33+
AsyncMQTT.connectAsync(SERVER_URL, {}, false).then((client) => {
34+
t.ok(client.connected, 'AsyncClient is connected');
35+
36+
client.end();
37+
}, (error) => {
38+
t.fail(error);
39+
});
40+
41+
42+
});
43+
3044
test('Should be able to listen on event on client', t => {
3145
t.plan(1);
3246

0 commit comments

Comments
 (0)