Skip to content

Commit 6b2db7c

Browse files
add reconnect event
1 parent 6e40b08 commit 6b2db7c

File tree

9 files changed

+224
-146
lines changed

9 files changed

+224
-146
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,3 @@ coverage
55
.vscode
66
.nyc_output
77
test/test.env
8-
labs/

labs/publisher.js

Lines changed: 0 additions & 38 deletions
This file was deleted.

labs/reconnect.js

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"use strict";
2+
3+
const chai = require("chai");
4+
const expect = chai.expect;
5+
const request = require("supertest");
6+
const sleep = require("then-sleep");
7+
const amqplib = require("amqplib");
8+
const { execSync } = require("child_process");
9+
10+
const merapi = require("merapi");
11+
const { async, Component } = require("merapi");
12+
13+
const { rabbitConnection, rabbitUrl, startRabbitCommand, stopRabbitCommand } = require("../test/configuration.js");
14+
15+
/* eslint-env mocha */
16+
17+
describe("Merapi Plugin Service: Queue Subscriber", function() {
18+
let publisherContainer, subscriberAContainer, subscriberBContainer;
19+
let service = {};
20+
let connection = {};
21+
let channel = {};
22+
let messageA = [];
23+
24+
beforeEach(async(function*() {
25+
this.timeout(5000);
26+
27+
let publisherConfig = {
28+
name: "publisher",
29+
version: "1.0.0",
30+
main: "mainCom",
31+
plugins: ["service"],
32+
service: {
33+
rabbit: rabbitConnection,
34+
queue: {
35+
publish: {
36+
subscriber: {
37+
sub_queue_reconnect_publisher_test: "inQueuePublisherTest",
38+
},
39+
},
40+
},
41+
port: 5135,
42+
},
43+
};
44+
45+
let subscriberConfig = {
46+
name: "subscriber",
47+
version: "1.0.0",
48+
main: "mainCom",
49+
plugins: ["service"],
50+
service: {
51+
rabbit: rabbitConnection,
52+
queue: {
53+
subscribe: {
54+
sub_queue_reconnect_publisher_test: "mainCom.handleIncomingMessage",
55+
},
56+
},
57+
},
58+
};
59+
60+
publisherContainer = merapi({
61+
basepath: __dirname,
62+
config: publisherConfig,
63+
});
64+
65+
publisherContainer.registerPlugin(
66+
"service-rabbit",
67+
require("../index.js")(publisherContainer)
68+
);
69+
publisherContainer.register(
70+
"mainCom",
71+
class MainCom extends Component {
72+
start() {}
73+
}
74+
);
75+
yield publisherContainer.start();
76+
77+
subscriberConfig.service.port = 5212;
78+
subscriberAContainer = merapi({
79+
basepath: __dirname,
80+
config: subscriberConfig,
81+
});
82+
83+
subscriberAContainer.registerPlugin(
84+
"service-rabbit",
85+
require("../index.js")(subscriberAContainer)
86+
);
87+
subscriberAContainer.register(
88+
"mainCom",
89+
class MainCom extends Component {
90+
start() {}
91+
*handleIncomingMessage(payload) {
92+
messageA.push(payload);
93+
}
94+
}
95+
);
96+
yield subscriberAContainer.start();
97+
98+
service = yield subscriberAContainer.resolve("service");
99+
connection = yield amqplib.connect(rabbitUrl);
100+
channel = yield connection.createChannel();
101+
102+
yield sleep(100);
103+
}));
104+
105+
afterEach(async(function*() {
106+
yield subscriberAContainer.stop();
107+
yield channel.close();
108+
yield connection.close();
109+
}));
110+
111+
describe("Subscriber service", function() {
112+
113+
describe("when subscribing event", function() {
114+
it("published event should be caught", async(function*() {
115+
this.timeout(5000);
116+
let trigger = yield publisherContainer.resolve("inQueuePublisherTest");
117+
118+
// send "0"
119+
yield sleep(100);
120+
yield trigger(0);
121+
yield sleep(1000);
122+
// messageA should be [0]
123+
expect(messageA).to.deep.equal([0]);
124+
125+
execSync(stopRabbitCommand);
126+
execSync(startRabbitCommand);
127+
128+
// send "1"
129+
yield sleep(100);
130+
yield trigger(1);
131+
yield sleep(1000);
132+
// messageA should be [1]
133+
expect(messageA).to.deep.equal([0, 1]);
134+
135+
}));
136+
});
137+
138+
});
139+
});

labs/subscriber.js

Lines changed: 0 additions & 34 deletions
This file was deleted.

labs/test1.js

Lines changed: 65 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,79 @@
1-
const q = 'tasks';
2-
31
const { execSync } = require("child_process");
4-
const amqplib = require("amqplib");
2+
const amqp = require("amqplib");
53

6-
async function createConnection() {
7-
const id = Math.round(Math.random() * 1000);
8-
console.log(`* connect ${id}`);
9-
const pconn = amqplib.connect("amqp://root:[email protected]:5672/").then((conn) => {
10-
conn.on("close", (error) => {
11-
console.log(`* connection closed ${id}:`, error);
12-
});
13-
conn.on("error", (error) => {
14-
console.log(`* connection error ${id}:`, error);
15-
});
16-
return conn;
4+
async function sleep(delay) {
5+
return new Promise((resolve, reject) => {
6+
setTimeout(resolve, delay);
177
});
18-
console.log(`* connected ${id}`);
19-
return pconn;
20-
}
21-
22-
async function createChannel(conn) {
23-
const id = Math.round(Math.random() * 1000);
24-
console.log(`* create channel ${id}`);
25-
const channel = conn.createChannel({durable: false});
26-
console.log(`* channel created ${id}`);
27-
return channel;
288
}
299

30-
async function createConnectionAndChannel() {
31-
const conn = await createConnection();
32-
const channel = await createChannel(conn);
33-
return channel;
34-
}
35-
36-
async function main(channels) {
37-
38-
if (!channels) {
39-
channels = await Promise.all([createConnectionAndChannel(), createConnectionAndChannel()]);
10+
async function createChannel(config) {
11+
const { url, publishers, listeners } = Object.assign({url: "", publishers: {}, listeners: {}}, config);
12+
try {
13+
// create connection
14+
const connection = await amqp.connect(url);
15+
let channel = null;
16+
connection._channels = [];
17+
connection.on("error", (error) => {
18+
console.error("Connection error : ", config, error);
19+
});
20+
connection.on("close", async (error) => {
21+
if (channel) {
22+
channel.close();
23+
}
24+
console.error("Connection close : ", config, error);
25+
await sleep(1000);
26+
createChannel(config);
27+
});
28+
// create channel
29+
channel = await connection.createConfirmChannel();
30+
channel.on("error", (error) => {
31+
console.error("Channel error : ", config, error);
32+
});
33+
channel.on("close", (error) => {
34+
console.error("Channel close : ", config, error);
35+
});
36+
// register listeners
37+
for (queue in listeners) {
38+
const callback = listeners[queue];
39+
channel.assertQueue(queue, { durable: false });
40+
channel.consume(queue, callback);
41+
}
42+
// publish
43+
for (queue in publishers) {
44+
const message = publishers[queue];
45+
channel.assertQueue(queue, { durable: false });
46+
channel.sendToQueue(queue, message);
47+
}
48+
return channel;
49+
} catch (error) {
50+
console.error("Create connection error : ", error);
51+
await sleep(1000);
52+
createChannel(config);
4053
}
41-
const [publisherChannel, consumerChannel] = channels;
54+
}
4255

43-
// publisher
44-
console.log("publish");
45-
await publisherChannel.assertQueue(q).then(function(ok) {
46-
console.log("published");
47-
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
56+
async function main() {
57+
const channelPublish = await createChannel({
58+
url: "amqp://root:[email protected]:5672",
59+
publishers: {
60+
"queue": Buffer.from("hello"),
61+
}
4862
});
4963

50-
// consumer
51-
console.log("consume");
52-
await consumerChannel.assertQueue(q).then(function(ok) {
53-
console.log("consumed");
54-
return consumerChannel.consume(q, function(msg) {
55-
if (msg !== null) {
56-
console.log(msg.content.toString());
57-
consumerChannel.ack(msg);
58-
}
59-
});
60-
});
64+
execSync("docker stop rabbitmq");
65+
execSync("docker start rabbitmq");
6166

62-
await new Promise((resolve, reject) => {
63-
setTimeout(resolve, 1000);
67+
const channelConsume = await createChannel({
68+
url: "amqp://root:[email protected]:5672",
69+
listeners: {
70+
"queue": (message) => {
71+
console.log("Receive message ", message.content.toString());
72+
},
73+
}
6474
});
6575

66-
await main(channels);
76+
return true;
6777
}
6878

69-
main();
79+
main().catch((error) => console.error(error));

0 commit comments

Comments
 (0)