Conversation
andris9
left a comment
There was a problem hiding this comment.
1. Set EXPIRE on sorted set keys to prevent orphan accumulation
The wd:imap:users:<userId> keys never get an EXPIRE. They're only cleaned inside fire(), so if a user disconnects and never receives another notification, the key lives forever.
In _registerUserWorker and in the _refreshUserWorkers pipeline, add expire after each zadd:
_registerUserWorker(userId) {
let key = this._getUserWorkersKey(userId);
this.redis.pipeline()
.zadd(key, Date.now(), this.workerId)
.expire(key, USER_WORKERS_TTL + 60)
.exec()
.catch(() => false);
}
And in _refreshUserWorkers:
for (let userId of counts.keys()) {
let key = this._getUserWorkersKey(userId);
pipeline.zadd(key, now, this.workerId);
pipeline.expire(key, USER_WORKERS_TTL + 60);
}
2. Move stale cleanup out of fire() hot path
Every fire() call runs ZREMRANGEBYSCORE to prune dead workers. Stale entries only appear after a worker crash, this is rare. Running it on every notification is unnecessary overhead.
Move it into _refreshUserWorkers instead (runs every 30s anyway), and simplify fire() to just ZRANGEBYSCORE + publish:
// In fire():
this.redis.zrangebyscore(key, minScore, '+inf').then(workers => {
if (!workers || !workers.length) return;
let pipeline = this.redis.pipeline();
workers.forEach(workerId => pipeline.publish(this._getWorkerChannel(workerId), data));
return pipeline.exec();
}).catch(() => false);
// In _refreshUserWorkers, add cleanup:
let minScore = Date.now() - USER_WORKERS_TTL_MS;
for (let userId of counts.keys()) {
let key = this._getUserWorkersKey(userId);
pipeline.zremrangebyscore(key, 0, minScore);
pipeline.zadd(key, now, this.workerId);
pipeline.expire(key, USER_WORKERS_TTL + 60);
}
This drops fire() from MULTI(2 commands) + pipeline to a single ZRANGEBYSCORE + pipeline.
3. Fix dbs.toml comment to match actual behavior
The comment says workerId = false # use os.hostname() instead but the actual ID is hostname:pid
# Worker identifier prefix for distributing notifications.
# Each process appends :PID automatically. Set to false to use os.hostname().
workerId = false
|
@andris9 no other global notifications are sent via wd_events. The provided strategy is safe. Any other events use BullMQ via a separate channel not related to wd_events or ImapNotifier. |
This PR implements a more efficient strategy for sending notifications via pub/sub between WD workers. The strategy also works for distributed workers, for example: server1 and server2 each having 8 workers connect to the redis server. The notifications for a selected user will go only to specified workers of specific servers.