Skip to content

Commit 916b9a8

Browse files
authored
fix(headers): remove 'expires' header (#8)
this PR will remove the `expires` header since it will introduce another bug
1 parent d61edba commit 916b9a8

10 files changed

+2859
-722
lines changed

.gitignore

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,3 @@ config.json
33
.idea
44
coverage
55
.vscode
6-
yarn.lock
7-
package-lock.json

lib/service_pub_queue_rabbit.js

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,9 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) {
2020
this._status = "ok";
2121
this._initializing = false;
2222
this._connection = null;
23-
this._withExpire = true;
2423
this._channels = {};
2524
this._registry = {};
2625
this._namespace = config.default("service.rabbit.namespace", "default");
27-
this.expireTime = this.config.default("service.rabbit.expireTime", 1000 * 60 * 60 * 24); // 1 day
2826

2927
this.servicePubQueue.on("triggerQueue", this.publishEvent.bind(this));
3028
this.amqp.on("connected", () => {
@@ -74,28 +72,7 @@ class ServicePubQueueRabbit extends Component.mixin(AsyncEmitter) {
7472

7573
this._channels[service][event] = channel;
7674

77-
const queueOpts = {
78-
durable: true,
79-
};
80-
81-
if (this._withExpire) {
82-
Object.assign(queueOpts, { expires: this.expireTime });
83-
}
84-
85-
// by default, assert queue with `expires` header. if queue is exists and
86-
// don't have `expires` header, rabbitMQ will throw an error and close channel.
87-
// we catch the error on `close` or `error` emit (amqp.node lib emit those error)
88-
// and re-init this class
89-
try {
90-
yield channel.assertQueue(queueName, queueOpts);
91-
} catch (e) {
92-
this._initializing = false;
93-
94-
// dont use `expires` header on re-init
95-
this._withExpire = false;
96-
this._connection.on("close", this.init.bind(this));
97-
this._connection.on("error", this.init.bind(this));
98-
}
75+
yield channel.assertQueue(queueName, { durable: true });
9976
}
10077

10178
info() {

lib/service_sub_queue_rabbit.js

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ class ServiceSubRabbit extends ServiceSubQueue {
2020
this._status = "ok";
2121
this._initializing = false;
2222
this._connection = null;
23-
this._withExpire = true;
2423
this._namespace = config.default("service.rabbit.namespace", "default");
25-
this.expireTime = this.config.default("service.rabbit.expireTime", 1000 * 60 * 60 * 24); // 1 day
2624

2725
this.amqp.on("connected", () => {
2826
this.init();
@@ -63,30 +61,7 @@ class ServiceSubRabbit extends ServiceSubQueue {
6361
if (channel) {
6462
let queueName = `${this._namespace}.queue.${this.SERVICE_NAME}.${event}`;
6563

66-
const queueOpts = {
67-
durable: true,
68-
};
69-
70-
if (this._withExpire) {
71-
Object.assign(queueOpts, { expires: this.expireTime });
72-
}
73-
74-
// by default, assert queue with `expires` header. if queue is exists and
75-
// don't have `expires` header, rabbitMQ will throw an error and close channel.
76-
// we catch the error on `close` or `error` emit (amqp.node lib emit those error)
77-
// and re-init this class
78-
try {
79-
yield channel.assertQueue(queueName, queueOpts);
80-
} catch (e) {
81-
this.logger.error("Error service_sub_queue_rabbit createQueue", e);
82-
83-
this._initializing = false;
84-
85-
// dont use `expires` header on re-init
86-
this._withExpire = false;
87-
this._connection.on("close", this.init.bind(this));
88-
this._connection.on("error", this.init.bind(this));
89-
}
64+
yield channel.assertQueue(queueName, { durable: true });
9065

9166
channel.consume(queueName, async(function* (message) {
9267
try {

lib/service_sub_rabbit.js

Lines changed: 5 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@ class ServiceSubRabbit extends ServiceSub {
2121

2222
this.maxAttempts = this.config.default("service.rabbit.maxAttempts", 5);
2323
this.retryDelay = this.config.default("service.rabbit.retryDelay", 5000);
24-
this.expireTime = this.config.default("service.rabbit.expireTime", 1000 * 60 * 60 * 24); // 1 day
2524
this.remainingAttempts = this.maxAttempts - 1;
2625

27-
this._withExpire = true;
2826
this._status = "ok";
2927
this._initializing = false;
3028
this._connection = null;
@@ -37,9 +35,8 @@ class ServiceSubRabbit extends ServiceSub {
3735
}
3836

3937
*initialize() {
40-
if (this.amqp.getConnection()) {
38+
if (this.amqp.getConnection())
4139
this.init();
42-
}
4340
}
4441

4542
*init() {
@@ -98,28 +95,7 @@ class ServiceSubRabbit extends ServiceSub {
9895
let channel = this._channels[event];
9996
let queueName = `${this._namespace}.${this.SERVICE_NAME}.${event}`;
10097

101-
const queueOpts = {
102-
durable: true,
103-
};
104-
105-
if (this._withExpire) {
106-
Object.assign(queueOpts, { expires: this.expireTime });
107-
}
108-
109-
// by default, assert queue with `expires` header. if queue is exists and
110-
// don't have `expires` header, rabbitMQ will throw an error and close channel.
111-
// we catch the error on `close` or `error` emit (amqp.node lib emit those error)
112-
// and re-init this class
113-
try {
114-
yield channel.assertQueue(queueName, queueOpts);
115-
} catch (e) {
116-
this._initializing = false;
117-
118-
// dont use `expires` header on re-init
119-
this._withExpire = false;
120-
this._connection.on("close", this.init.bind(this));
121-
this._connection.on("error", this.init.bind(this));
122-
}
98+
yield channel.assertQueue(queueName, { durable: true });
12399

124100
channel.consume(queueName, async(function* (message) {
125101
try {
@@ -144,29 +120,7 @@ class ServiceSubRabbit extends ServiceSub {
144120
let queueName = this._namespace + "." + publisherName + "." + this.SERVICE_NAME + "." + event;
145121
let exchangeName = this._namespace + "." + publisherName + "." + event;
146122

147-
const queueOpts = {
148-
durable: true,
149-
};
150-
151-
if (this._withExpire) {
152-
Object.assign(queueOpts, { expires: this.expireTime });
153-
}
154-
155-
// by default, assert queue with `expires` header. if queue is exists and
156-
// don't have `expires` header, rabbitMQ will throw an error and close channel.
157-
// we catch the error on `close` or `error` emit (amqp.node lib emit those error)
158-
// and re-init this class
159-
try {
160-
yield channel.assertQueue(queueName, queueOpts);
161-
} catch (e) {
162-
this._initializing = false;
163-
164-
// dont use `expires` header on re-init
165-
this._withExpire = false;
166-
this._connection.on("close", this.init.bind(this));
167-
this._connection.on("error", this.init.bind(this));
168-
}
169-
123+
yield channel.assertQueue(queueName, { durable: true });
170124
yield channel.assertExchange(exchangeName, "fanout", { durable: true });
171125
yield channel.bindQueue(queueName, exchangeName, "");
172126
this._queues.push(queueName);
@@ -215,12 +169,12 @@ class ServiceSubRabbit extends ServiceSub {
215169
retryDelay: this.retryDelay,
216170
retryStrategy: this.retryStrategy.bind(this),
217171
fullResponse: false
218-
};
172+
}
219173

220174
if (this._secret != null) {
221175
jsonBody["headers"] = {
222176
"Authorization": `Bearer ${this._secret}`
223-
};
177+
}
224178
}
225179

226180
response = yield request(jsonBody);

0 commit comments

Comments
 (0)