Skip to content

Commit 0b45b71

Browse files
committed
feat: rework UDP cluster manager message handling - create a thread to process messages without blocking event loop
1 parent 25cbb2c commit 0b45b71

File tree

6 files changed

+483
-234
lines changed

6 files changed

+483
-234
lines changed

package.json

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
"scripts": {
1414
"benchmark": "node benchmark -c $(( $(nproc) - 2 )) -m 100000",
1515
"prepare": "./node_modules/.bin/tsc",
16-
"test": "./node_modules/.bin/tsc && ./node_modules/.bin/nyc mocha && ./node_modules/.bin/nyc report --reporter=text-lcov",
17-
"test-fast": "./node_modules/.bin/tsc && ./node_modules/.bin/nyc mocha && /usr/bin/env node -e \"import('open').then(open => open.default('file://`pwd`/coverage/index.html', { wait: false }))\"",
16+
"test": "./node_modules/.bin/tsc && ./node_modules/.bin/nyc mocha --exit --timeout 10000 && ./node_modules/.bin/nyc report --reporter=text-lcov",
17+
"test-fast": "./node_modules/.bin/tsc && ./node_modules/.bin/nyc mocha --exit --timeout 10000 && /usr/bin/env node -e \"import('open').then(open => open.default('file://`pwd`/coverage/index.html', { wait: false }))\"",
1818
"test-local": "export COVERALLS_REPO_TOKEN=$IMQ_COVERALLS_TOKEN && npm test && /usr/bin/env node -e \"import('open').then(open => open.default('https://coveralls.io/github/imqueue/imq', { wait: false }))\"",
1919
"test-dev": "npm run test && npm run clean-js && npm run clean-typedefs && npm run clean-maps",
2020
"test-coverage": "cat ./coverage/lcov.info | CODECLIMATE_API_HOST=https://codebeat.co/webhooks/code_coverage CODECLIMATE_REPO_TOKEN=85bb2a18-4ebb-4e48-a2ce-92b7bf438b1a ./node_modules/.bin/codeclimate-test-reporter",
@@ -80,7 +80,9 @@
8080
],
8181
"recursive": true,
8282
"bail": true,
83-
"full-trace": true
83+
"full-trace": true,
84+
"exit": true,
85+
"timeout": 10000
8486
},
8587
"nyc": {
8688
"check-coverage": false,

src/ClusteredRedisQueue.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -502,15 +502,14 @@ export class ClusteredRedisQueue implements IMessageQueue,
502502
return;
503503
}
504504

505-
if (remove.imq) {
506-
this.imqs = this.imqs.filter(imq => remove.imq !== imq);
507-
remove.imq.destroy().catch();
508-
}
505+
const imqToRemove = remove.imq;
509506

510-
this.clusterEmitter.emit('remove', {
511-
server: remove,
512-
imq: remove.imq,
513-
});
507+
if (imqToRemove) {
508+
this.imqs = this.imqs.filter(
509+
imq => imqToRemove.redisKey !== imq.redisKey
510+
);
511+
imqToRemove.destroy().catch();
512+
}
514513

515514
this.queueLength = this.imqs.length;
516515
this.servers = this.servers.filter(
@@ -519,12 +518,22 @@ export class ClusteredRedisQueue implements IMessageQueue,
519518
server,
520519
),
521520
);
521+
this.clusterEmitter.emit('remove', {
522+
server: remove,
523+
imq: imqToRemove,
524+
});
522525
}
523526

524527
private addServerWithQueueInitializing(
525528
server: ClusterServer,
526529
initializeQueue: boolean = true,
527530
): ClusterServer {
531+
const existingServer = this.findServer(server);
532+
533+
if (existingServer) {
534+
return existingServer;
535+
}
536+
528537
const newServer: ClusterServer = {
529538
id: server.id,
530539
host: server.host,

src/RedisQueue.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ export class RedisQueue extends EventEmitter<EventMap>
201201
/**
202202
* This queue instance unique key (identifier), for internal use
203203
*/
204-
private readonly redisKey: string;
204+
public readonly redisKey: string;
205205

206206
/**
207207
* LUA scripts for redis
@@ -697,6 +697,7 @@ export class RedisQueue extends EventEmitter<EventMap>
697697
),
698698
retryStrategy: this.retryStrategy(context),
699699
autoResubscribe: true,
700+
enableReadyCheck: true,
700701
});
701702

702703
context[channel] = makeRedisSafe(redis);

0 commit comments

Comments
 (0)