Skip to content

Commit 5dab0f5

Browse files
add wrapper that can reconnect
1 parent 3b03a37 commit 5dab0f5

File tree

3 files changed

+141
-0
lines changed

3 files changed

+141
-0
lines changed

labs/publisher.js

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
const amqp = require('amqp-connection-manager');
2+
3+
const QUEUE_NAME = 'amqp-connection-manager-sample2'
4+
const EXCHANGE_NAME = 'amqp-connection-manager-sample2-ex';
5+
6+
// Create a connetion manager
7+
const connection = amqp.connect(["amqp://root:[email protected]:5672/"], {json: true});
8+
connection.on('connect', () => console.log('Connected!'));
9+
connection.on('disconnect', params => console.log('Disconnected.', params.err.stack));
10+
11+
// Create a channel wrapper
12+
const channelWrapper = connection.createChannel({
13+
json: true,
14+
setup: channel => channel.assertExchange(EXCHANGE_NAME, 'topic')
15+
});
16+
17+
// Send messages until someone hits CTRL-C or something goes wrong...
18+
function sendMessage() {
19+
const msg = {time: Date.now()};
20+
channelWrapper.publish(EXCHANGE_NAME, "test", msg, { contentType: 'application/json', persistent: true })
21+
.then(function() {
22+
console.log("Message sent", msg);
23+
})
24+
.then(() => {
25+
return new Promise((resolve, reject) => {
26+
setTimeout(() => resolve(), 1000);
27+
});
28+
})
29+
.then(() => sendMessage())
30+
.catch(err => {
31+
console.log("Message was rejected:", err.stack);
32+
channelWrapper.close();
33+
connection.close();
34+
});
35+
};
36+
37+
console.log("Sending messages...");
38+
sendMessage();

labs/subscriber.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
const amqp = require('amqp-connection-manager');
2+
3+
const QUEUE_NAME = 'amqp-connection-manager-sample2'
4+
const EXCHANGE_NAME = 'amqp-connection-manager-sample2-ex';
5+
6+
// Handle an incomming message.
7+
const onMessage = data => {
8+
var message = JSON.parse(data.content.toString());
9+
console.log("subscriber: got message", message);
10+
channelWrapper.ack(data);
11+
}
12+
13+
// Create a connetion manager
14+
const connection = amqp.connect(["amqp://root:[email protected]:5672/"], {json: true});
15+
connection.on('connect', () => console.log('Connected!'));
16+
connection.on('disconnect', params => console.log('Disconnected.', params.err.stack));
17+
18+
// Set up a channel listening for messages in the queue.
19+
var channelWrapper = connection.createChannel({
20+
setup: channel =>
21+
// `channel` here is a regular amqplib `ConfirmChannel`.
22+
Promise.all([
23+
channel.assertQueue(QUEUE_NAME, { exclusive: true, autoDelete: true }),
24+
channel.assertExchange(EXCHANGE_NAME, 'topic'),
25+
channel.prefetch(1),
26+
channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, '#'),
27+
channel.consume(QUEUE_NAME, onMessage)
28+
])
29+
});
30+
31+
channelWrapper.waitForConnect()
32+
.then(function() {
33+
console.log("Listening for messages");
34+
});

labs/test1.js

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
const q = 'tasks';
2+
3+
const { execSync } = require("child_process");
4+
const amqplib = require("amqplib");
5+
6+
async function createConnection() {
7+
const id = Math.round(Math.random() * 1000);
8+
console.log(`* connect ${id}`);
9+
const pconn = amqplib.connect("amqp://root:[email protected]:5672/").then((conn) => {
10+
conn.on("close", (error) => {
11+
console.log(`* connection closed ${id}:`, error);
12+
});
13+
conn.on("error", (error) => {
14+
console.log(`* connection error ${id}:`, error);
15+
});
16+
return conn;
17+
});
18+
console.log(`* connected ${id}`);
19+
return pconn;
20+
}
21+
22+
async function createChannel(conn) {
23+
const id = Math.round(Math.random() * 1000);
24+
console.log(`* create channel ${id}`);
25+
const channel = conn.createChannel({durable: false});
26+
console.log(`* channel created ${id}`);
27+
return channel;
28+
}
29+
30+
async function createConnectionAndChannel() {
31+
const conn = await createConnection();
32+
const channel = await createChannel(conn);
33+
return channel;
34+
}
35+
36+
async function main(channels) {
37+
38+
if (!channels) {
39+
channels = await Promise.all([createConnectionAndChannel(), createConnectionAndChannel()]);
40+
}
41+
const [publisherChannel, consumerChannel] = channels;
42+
43+
// publisher
44+
console.log("publish");
45+
await publisherChannel.assertQueue(q).then(function(ok) {
46+
console.log("published");
47+
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
48+
});
49+
50+
// consumer
51+
console.log("consume");
52+
await consumerChannel.assertQueue(q).then(function(ok) {
53+
console.log("consumed");
54+
return consumerChannel.consume(q, function(msg) {
55+
if (msg !== null) {
56+
console.log(msg.content.toString());
57+
consumerChannel.ack(msg);
58+
}
59+
});
60+
});
61+
62+
await new Promise((resolve, reject) => {
63+
setTimeout(resolve, 1000);
64+
});
65+
66+
await main(channels);
67+
}
68+
69+
main();

0 commit comments

Comments
 (0)