Skip to content

Commit 3eadb47

Browse files
committed
improve dbs.toml, improve fanout, remove .multi() call
1 parent a59eca4 commit 3eadb47

File tree

2 files changed

+13
-8
lines changed

2 files changed

+13
-8
lines changed

config/dbs.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ sender = "zone-mta"
2626

2727
#queued="mail"
2828

29-
# Id to use for distributing notifications
30-
workerId = false # use os.hostname() instead
29+
# Worker identifier prefix for distributing notifications.
30+
# Each process appends :PID automatically. Set to false to use os.hostname().
31+
workerId = false
3132

3233
[redis]
3334
host = "127.0.0.1"

lib/imap-notifier.js

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,12 +337,8 @@ class ImapNotifier extends EventEmitter {
337337
let minScore = Date.now() - USER_WORKERS_TTL_MS;
338338
// send message only to workers that actually service this user
339339
this.redis
340-
.multi()
341-
.zremrangebyscore(key, 0, minScore) // remove stale workers
342340
.zrangebyscore(key, minScore, '+inf') // get active workers
343-
.exec()
344-
.then(res => {
345-
let workers = (res && res[1] && res[1][1]) || [];
341+
.then(workers => {
346342
if (!workers || !workers.length) {
347343
return;
348344
}
@@ -386,7 +382,12 @@ class ImapNotifier extends EventEmitter {
386382

387383
_registerUserWorker(userId) {
388384
let key = this._getUserWorkersKey(userId);
389-
this.redis.zadd(key, Date.now(), this.workerId).catch(() => false);
385+
this.redis
386+
.pipeline()
387+
.zadd(key, Date.now(), this.workerId)
388+
.expire(key, USER_WORKERS_TTL + 60)
389+
.exec()
390+
.catch(() => false);
390391
}
391392

392393
_unregisterUserWorker(userId) {
@@ -403,9 +404,12 @@ class ImapNotifier extends EventEmitter {
403404
let redis = this._userRegistryState.redis || this.redis;
404405
let pipeline = redis.pipeline();
405406
let now = Date.now();
407+
let minScore = now - USER_WORKERS_TTL_MS;
406408
for (let userId of counts.keys()) {
407409
let key = this._getUserWorkersKey(userId);
410+
pipeline.zremrangebyscore(key, 0, minScore);
408411
pipeline.zadd(key, now, this.workerId);
412+
pipeline.expire(key, USER_WORKERS_TTL + 60);
409413
}
410414
pipeline.exec().catch(() => false);
411415
}

0 commit comments

Comments
 (0)