Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions config/dbs.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# mongodb connection string for the main database
mongo="mongodb://127.0.0.1:27017/wildduck"
mongo = "mongodb://127.0.0.1:27017/wildduck"

# redis connection string to connect to a single master (see below for Sentinel example)
#redis="redis://127.0.0.1:6379/3"
Expand All @@ -22,14 +22,18 @@ mongo="mongodb://127.0.0.1:27017/wildduck"

# Optional database name or connection url for ZoneMTA queue database. This is
# used to push outbound emails to the sending queue
sender="zone-mta"
sender = "zone-mta"

#queued="mail"

# Worker identifier prefix for distributing notifications.
# Each process appends :PID automatically. Set to false to use os.hostname().
workerId = false

[redis]
host="127.0.0.1"
port=6379
db=3
host = "127.0.0.1"
port = 6379
db = 3

## Connect to Redis Sentinel instead of single master
# [redis]
Expand Down
115 changes: 110 additions & 5 deletions lib/imap-notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,28 @@
const config = require('@zone-eu/wild-config');
const tools = require('./tools');
const consts = require('./consts');
const os = require('os');
const EventEmitter = require('events').EventEmitter;
const Redis = require('ioredis');
const log = require('npmlog');
const counters = require('./counters');
const errors = require('./errors');
const { publish, MARKED_SPAM, MARKED_HAM } = require('./events');

const WORKER_CHANNEL_PREFIX = 'wd_events:worker:';
const USER_WORKERS_PREFIX = 'wd:imap:users:';
const USER_WORKERS_TTL = Math.max(Number(config?.imap?.notifyUserWorkersTtl) || 120, 30);
const USER_WORKERS_REFRESH = Math.max(Number(config?.imap?.notifyUserWorkersRefresh) || Math.floor(USER_WORKERS_TTL / 4), 10);
const USER_WORKERS_TTL_MS = USER_WORKERS_TTL * 1000 - 1000; // allow for 1 sec window just in case
const WORKER_ID = `${config?.dbs?.workerId || os.hostname()}:${process.pid}`;
const USER_REGISTRY_STATE = {
counts: new Map(),
workerId: WORKER_ID,
workerChannel: `${WORKER_CHANNEL_PREFIX}${WORKER_ID}`,
timer: null,
redis: null
};

class ImapNotifier extends EventEmitter {
constructor(options) {
super();
Expand All @@ -18,6 +33,9 @@ class ImapNotifier extends EventEmitter {
this.redis = options.redis || new Redis(tools.redisConfig(config.dbs.redis));
errors.registerRedisErrorLogger(this.redis, { role: 'imap-notifier' }); // if separate redis will add the appropriate role
this.counters = counters(this.redis);
this._userRegistryState = USER_REGISTRY_STATE;
this.workerId = this._userRegistryState.workerId;
this._workerChannel = this._userRegistryState.workerChannel;

this.logger = options.logger || {
info: log.silly.bind(log, 'IMAP'),
Expand Down Expand Up @@ -79,7 +97,7 @@ class ImapNotifier extends EventEmitter {
};

this.subscriber.on('message', (channel, message) => {
if (channel === 'wd_events') {
if (channel === this._workerChannel) {
let data;
// if e present at beginning, check if p also is present
// if no p -> no json parse
Expand Down Expand Up @@ -118,7 +136,13 @@ class ImapNotifier extends EventEmitter {
}
});

this.subscriber.subscribe('wd_events');
this.subscriber.subscribe(this._workerChannel);

if (!this._userRegistryState.timer) {
this._userRegistryState.redis = this.redis;
this._userRegistryState.timer = setInterval(() => this._refreshUserWorkers(), USER_WORKERS_REFRESH * 1000);
this._userRegistryState.timer.unref();
}
}

/**
Expand All @@ -128,7 +152,9 @@ class ImapNotifier extends EventEmitter {
* @param {Function} handler Function to run once there are new entries in the journal
*/
addListener(session, handler) {
this._listeners.addListener(session.user.id.toString(), handler);
let userId = session.user.id.toString();
this._listeners.addListener(userId, handler);
this._incrementUserListener(userId);

this.logger.debug('[%s] New journal listener for %s (%s)', session.id, session.user.id.toString(), session.user.username);
}
Expand All @@ -140,7 +166,9 @@ class ImapNotifier extends EventEmitter {
* @param {Function} handler Function to run once there are new entries in the journal
*/
removeListener(session, handler) {
this._listeners.removeListener(session.user.id.toString(), handler);
let userId = session.user.id.toString();
this._listeners.removeListener(userId, handler);
this._decrementUserListener(userId);

this.logger.debug('[%s] Removed journal listener from %s (%s)', session.id, session.user.id.toString(), session.user.username);
}
Expand Down Expand Up @@ -305,10 +333,87 @@ class ImapNotifier extends EventEmitter {
e: user.toString(),
p: payload
});
this.redis.publish('wd_events', data);
let key = this._getUserWorkersKey(user.toString());
let minScore = Date.now() - USER_WORKERS_TTL_MS;
// send message only to workers that actually service this user
this.redis
.zrangebyscore(key, minScore, '+inf') // get active workers
.then(workers => {
if (!workers || !workers.length) {
return;
}
let pipeline = this.redis.pipeline();
workers.forEach(workerId => pipeline.publish(this._getWorkerChannel(workerId), data));
return pipeline.exec();
})
.catch(() => false);
});
}

_getUserWorkersKey(userId) {
return `${USER_WORKERS_PREFIX}${userId}`;
}

_getWorkerChannel(workerId) {
return `${WORKER_CHANNEL_PREFIX}${workerId}`;
}

_incrementUserListener(userId) {
let counts = this._userRegistryState.counts;
let count = counts.get(userId) || 0;
count++;
counts.set(userId, count);
if (count === 1) {
this._registerUserWorker(userId);
}
}

_decrementUserListener(userId) {
let counts = this._userRegistryState.counts;
let count = counts.get(userId) || 0;
count--;
if (count <= 0) {
counts.delete(userId);
this._unregisterUserWorker(userId);
} else {
counts.set(userId, count);
}
}

_registerUserWorker(userId) {
let key = this._getUserWorkersKey(userId);
this.redis
.pipeline()
.zadd(key, Date.now(), this.workerId)
.expire(key, USER_WORKERS_TTL + 60)
.exec()
.catch(() => false);
}

_unregisterUserWorker(userId) {
let key = this._getUserWorkersKey(userId);
this.redis.zrem(key, this.workerId).catch(() => false);
}

_refreshUserWorkers() {
let counts = this._userRegistryState.counts;
if (!counts || !counts.size) {
return;
}

let redis = this._userRegistryState.redis || this.redis;
let pipeline = redis.pipeline();
let now = Date.now();
let minScore = now - USER_WORKERS_TTL_MS;
for (let userId of counts.keys()) {
let key = this._getUserWorkersKey(userId);
pipeline.zremrangebyscore(key, 0, minScore);
pipeline.zadd(key, now, this.workerId);
pipeline.expire(key, USER_WORKERS_TTL + 60);
}
pipeline.exec().catch(() => false);
}

/**
* Returns all entries from the journal that have higher than provided modification index
*
Expand Down