Skip to content

Commit fe939da

Browse files
remove service_amqp.js
1 parent b4f2b92 commit fe939da

10 files changed

+514
-7
lines changed

index.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ module.exports = function () {
1313
container.register("servicePubQueueRabbit", require("./lib/service_pub_queue_rabbit"));
1414
container.register("serviceSubQueueRabbit", require("./lib/service_sub_queue_rabbit"));
1515
container.alias("serviceSubQueue", "serviceSubQueueRabbit");
16-
17-
container.register("amqp", require("./lib/service_amqp"));
1816
},
1917
*onInit(container) {
2018
let service = yield container.resolve("service");
@@ -30,4 +28,4 @@ module.exports = function () {
3028
service.addModule("sub-queue-rabbit", serviceSubQueueRabbit);
3129
}
3230
};
33-
};
31+
};

lib/service_pub_queue_rabbit.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
const { Component, AsyncEmitter } = require("merapi");
44
const pack = require("../package");
55

6+
const Rabbit = require("./Rabbit");
7+
68
class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) {
79

8-
constructor(config, logger, injector, amqp, servicePubQueue) {
10+
constructor(config, logger, injector, servicePubQueue) {
911
super();
1012

1113
this.config = config;

lib/service_pub_rabbit.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
const { Component, AsyncEmitter } = require("merapi");
44
const pack = require("../package");
55

6+
const Rabbit = require("./Rabbit");
7+
68
class ServicePubRabbit extends Component.mixin(AsyncEmitter) {
79

8-
constructor(config, logger, injector, amqp, servicePub) {
10+
constructor(config, logger, injector, servicePub) {
911
super();
1012

1113
this.config = config;

lib/service_sub_queue_rabbit.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ const { async, utils } = require("merapi");
44
const ServiceSubQueue = require("merapi-plugin-service/lib/service_sub_queue");
55
const pack = require("../package");
66

7+
const Rabbit = require("./Rabbit");
8+
79
class ServiceSubRabbit extends ServiceSubQueue {
810

9-
constructor(config, logger, injector, amqp) {
11+
constructor(config, logger, injector) {
1012
super(config, logger, injector);
1113

1214
this.config = config;

lib/service_sub_rabbit.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ const ServiceSub = require("merapi-plugin-service/lib/service_sub");
55
const pack = require("../package");
66
const request = require("requestretry");
77

8+
const Rabbit = require("./Rabbit");
9+
810
class ServiceSubRabbit extends ServiceSub {
911

10-
constructor(config, logger, injector, amqp, servicePubRabbit) {
12+
constructor(config, logger, injector, servicePubRabbit) {
1113
super(config, logger, injector);
1214

1315
this.config = config;
File renamed without changes.
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"use strict";
2+
3+
const { Component, AsyncEmitter } = require("merapi");
4+
const pack = require("../package");
5+
6+
class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) {
7+
8+
constructor(config, logger, injector, amqp, servicePubQueue) {
9+
super();
10+
11+
this.config = config;
12+
this.logger = logger;
13+
this.injector = injector;
14+
this.amqp = amqp;
15+
this.servicePubQueue = servicePubQueue;
16+
17+
this.SERVICE_NAME = config.default("name", "unnamed-service");
18+
this.VERSION = pack.version;
19+
20+
this._status = "ok";
21+
this._initializing = false;
22+
this._connection = null;
23+
this._channels = {};
24+
this._registry = {};
25+
this._namespace = config.default("service.rabbit.namespace", "default");
26+
27+
this.servicePubQueue.on("triggerQueue", this.publishEvent.bind(this));
28+
this.amqp.on("connected", () => {
29+
this.init();
30+
});
31+
}
32+
33+
*initialize() {
34+
if (this.amqp.getConnection())
35+
this.init();
36+
}
37+
38+
*init() {
39+
if (this._initializing) return;
40+
this._initializing = true;
41+
42+
this._connection = this.amqp.getConnection();
43+
44+
let desc = this.config.default("service.queue.publish", {});
45+
46+
for (let service in desc) {
47+
for (let event in desc[service]) {
48+
this.createPublisher(service, event);
49+
}
50+
}
51+
52+
Object.assign(this._registry, this.config.default("service.registry", {}));
53+
54+
this._initializing = false;
55+
}
56+
57+
publishEvent(service, event, payload) {
58+
let channel = this._channels[service] && this._channels[service][event];
59+
if (channel) {
60+
let queueName = `${this._namespace}.queue.${service}.${event}`;
61+
let content = JSON.stringify(payload);
62+
channel.sendToQueue(queueName, Buffer.from(content), { persistent: true });
63+
}
64+
}
65+
66+
*createPublisher(service, event) {
67+
let channel = yield this._connection.createChannel();
68+
let queueName = `${this._namespace}.queue.${service}.${event}`;
69+
70+
if (!this._channels[service])
71+
this._channels[service] = {};
72+
73+
this._channels[service][event] = channel;
74+
75+
yield channel.assertQueue(queueName, { durable: true });
76+
}
77+
78+
info() {
79+
return {
80+
version: this.VERSION,
81+
status: this._status
82+
};
83+
}
84+
85+
status() {
86+
return this._status;
87+
}
88+
89+
extension() {
90+
return {};
91+
}
92+
93+
*mount() { }
94+
95+
*unmount() { }
96+
}
97+
98+
module.exports = ServicePubQueueRabbit;

old-lib/service_pub_rabbit.js

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"use strict";
2+
3+
const { Component, AsyncEmitter } = require("merapi");
4+
const pack = require("../package");
5+
6+
class ServicePubRabbit extends Component.mixin(AsyncEmitter) {
7+
8+
constructor(config, logger, injector, amqp, servicePub) {
9+
super();
10+
11+
this.config = config;
12+
this.logger = logger;
13+
this.injector = injector;
14+
this.amqp = amqp;
15+
this.servicePub = servicePub;
16+
17+
this.SERVICE_NAME = config.default("name", "unnamed-service");
18+
this.VERSION = pack.version;
19+
20+
this._status = "ok";
21+
this._initializing = false;
22+
this._connection = null;
23+
this._namespace = config.default("service.rabbit.namespace", "default");
24+
25+
this.servicePub.on("trigger", this.publishEvent.bind(this));
26+
this.amqp.on("connected", () => {
27+
this.init();
28+
});
29+
}
30+
31+
*initialize() {
32+
if (this.amqp.getConnection())
33+
this.init();
34+
}
35+
36+
*init() {
37+
if (this._initializing) return;
38+
this._initializing = true;
39+
40+
this._connection = this.amqp.getConnection();
41+
this._eventList = [];
42+
this._channels = {};
43+
44+
let desc = this.config.default("service.publish", {});
45+
for (let i in desc) {
46+
this._eventList.push(i);
47+
yield this.createPublisher(i, desc[i]);
48+
}
49+
50+
this._initializing = false;
51+
}
52+
53+
publishEvent(event, payload) {
54+
let channel = this._channels[event];
55+
if (channel && this._eventList.includes(event)) {
56+
let exchangeName = this._namespace + "." + this.SERVICE_NAME + "." + event;
57+
let content = JSON.stringify(payload);
58+
channel.publish(exchangeName, "", Buffer.from(content), { persistent: true });
59+
}
60+
}
61+
62+
*createPublisher(event) {
63+
let channel = yield this._connection.createChannel();
64+
let exchangeName = this._namespace + "." + this.SERVICE_NAME + "." + event;
65+
this._channels[event] = channel;
66+
67+
yield channel.assertExchange(exchangeName, "fanout", { durable: true });
68+
}
69+
70+
info() {
71+
return {
72+
version: this.VERSION,
73+
status: this._status
74+
};
75+
}
76+
77+
status() {
78+
return this._status;
79+
}
80+
81+
extension() {
82+
return {
83+
exchanges: this._eventList.map((event) => this.SERVICE_NAME + "." + event)
84+
};
85+
}
86+
87+
*mount(service) {
88+
service;
89+
}
90+
91+
*unmount(service) {
92+
service;
93+
}
94+
95+
getEventList() {
96+
return this._eventList;
97+
}
98+
}
99+
100+
module.exports = ServicePubRabbit;
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"use strict";
2+
3+
const { async, utils } = require("merapi");
4+
const ServiceSubQueue = require("merapi-plugin-service/lib/service_sub_queue");
5+
const pack = require("../package");
6+
7+
class ServiceSubRabbit extends ServiceSubQueue {
8+
9+
constructor(config, logger, injector, amqp) {
10+
super(config, logger, injector);
11+
12+
this.config = config;
13+
this.logger = logger;
14+
this.injector = injector;
15+
this.amqp = amqp;
16+
17+
this.SERVICE_NAME = config.default("name", "unnamed-service");
18+
this.VERSION = pack.version;
19+
20+
this._status = "ok";
21+
this._initializing = false;
22+
this._connection = null;
23+
this._namespace = config.default("service.rabbit.namespace", "default");
24+
25+
this.amqp.on("connected", () => {
26+
this.init();
27+
});
28+
}
29+
30+
*initialize() {
31+
if (this.amqp.getConnection())
32+
this.init();
33+
}
34+
35+
*init() {
36+
if (this._initializing) return;
37+
this._initializing = true;
38+
39+
this._connection = this.amqp.getConnection();
40+
41+
this._subscribers = {};
42+
this._channels = {};
43+
this._queues = [];
44+
45+
let desc = this.config.default("service.queue.subscribe", {});
46+
let prefetch = parseInt(this.config.default("service.rabbit.prefetch", 5));
47+
48+
for (let event in desc) {
49+
let channel = yield this._connection.createChannel();
50+
yield channel.prefetch(prefetch);
51+
this._channels[event] = channel;
52+
53+
let method = yield this.injector.resolveMethod(desc[event]);
54+
this.createQueue(event, method);
55+
}
56+
}
57+
58+
*createQueue(event, method) {
59+
let channel = this._channels[event];
60+
61+
if (channel) {
62+
let queueName = `${this._namespace}.queue.${this.SERVICE_NAME}.${event}`;
63+
64+
yield channel.assertQueue(queueName, { durable: true });
65+
66+
channel.consume(queueName, async(function* (message) {
67+
try {
68+
let payload = JSON.parse(message.content.toString());
69+
let ret = yield this.runMethod(method, payload);
70+
71+
channel.ack(message);
72+
}
73+
catch (e) {
74+
channel.nack(message);
75+
}
76+
}.bind(this)));
77+
78+
}
79+
}
80+
81+
*runMethod(method, payload) {
82+
let ret = method(payload);
83+
if (utils.isPromise(ret)) yield ret;
84+
85+
return ret;
86+
}
87+
88+
extension() {
89+
return {};
90+
}
91+
92+
*mount() { }
93+
94+
*unmount() { }
95+
}
96+
97+
module.exports = ServiceSubRabbit;

0 commit comments

Comments
 (0)