Skip to content

Commit 839ce55

Browse files
committed
fix: address 14 bugs found since v2.61.5
- Disable export resume to prevent corrupted files from append-mode writes with new gzip/encrypt stream headers - Revert notifyFrom default from null to now so new accounts do not emit webhooks for all existing messages during first sync - Use nullish coalescing for queueKeep so that 0 is not replaced by true - Validate exportId in UI routes with strict pattern instead of loose string max - Guard Export.fail() against deleted keys to prevent zombie Redis entries - Clean up stale active-set entries whose export hashes have expired - Call Export.cleanup() on worker startup to remove orphaned files - Cap Gmail export batch size at 50 to match Outlook behavior - Capture indexing start time once to ensure deterministic endDate filtering across folders - Remove swallowed mkdir error so failures propagate properly - Move NON_RETRYABLE_CODES Set to module scope to avoid per-error allocation - Use unsigned right shift for score tiebreaker to prevent negative values when hash byte >= 128 - Validate DecryptStream chunk length to prevent memory exhaustion from malformed files - Destroy file read stream on decrypt error to prevent resource leaks
1 parent 6def6ef commit 839ce55

File tree

10 files changed

+75
-147
lines changed

10 files changed

+75
-147
lines changed

lib/api-routes/account-routes.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ async function init(args) {
195195

196196
logs: Joi.boolean().example(false).description('Store recent logs').default(false),
197197

198-
notifyFrom: accountSchemas.notifyFrom.default(null),
198+
notifyFrom: accountSchemas.notifyFrom.default('now'),
199199
syncFrom: accountSchemas.syncFrom.default(null),
200200

201201
proxy: settingsSchema.proxyUrl,
@@ -630,7 +630,7 @@ async function init(args) {
630630

631631
payload: Joi.object({
632632
flush: Joi.boolean().truthy('Y', 'true', '1').falsy('N', 'false', 0).default(false).description('Only flush the account if true'),
633-
notifyFrom: accountSchemas.notifyFrom.default(null),
633+
notifyFrom: accountSchemas.notifyFrom.default('now'),
634634
imapIndexer: accountSchemas.imapIndexer,
635635
syncFrom: accountSchemas.syncFrom
636636
}).label('RequestFlush')

lib/api-routes/export-routes.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ async function init(args) {
140140
throw Boom.notFound('Export not found');
141141
}
142142

143-
let stream = fs.createReadStream(fileInfo.filePath);
143+
const fileReadStream = fs.createReadStream(fileInfo.filePath);
144+
let stream = fileReadStream;
144145

145146
stream.on('error', err => {
146147
request.logger.error({ msg: 'Export download stream error', exportId, err });
@@ -149,13 +150,15 @@ async function init(args) {
149150
if (fileInfo.isEncrypted) {
150151
const secret = await getSecret();
151152
if (!secret) {
153+
fileReadStream.destroy();
152154
throw Boom.serverUnavailable('Encryption secret not available for decryption');
153155
}
154156
const decryptStream = createDecryptStream(secret);
155157
decryptStream.on('error', err => {
156158
request.logger.error({ msg: 'Export decryption error', exportId, err });
159+
fileReadStream.destroy();
157160
});
158-
stream = stream.pipe(decryptStream);
161+
stream = fileReadStream.pipe(decryptStream);
159162
}
160163

161164
return h

lib/email-client/base-client.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1984,7 +1984,7 @@ class BaseClient {
19841984
// Store message in Redis
19851985
await this.redis.hsetBuffer(`${REDIS_PREFIX}iaq:${this.account}`, queueId, msgEntry);
19861986

1987-
let queueKeep = (await settings.get('queueKeep')) || true;
1987+
let queueKeep = (await settings.get('queueKeep')) ?? true;
19881988

19891989
// Configure delivery retry settings
19901990
let defaultDeliveryAttempts = await settings.get('deliveryAttempts');

lib/email-client/gmail-client.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ const settings = require('../settings');
2929

3030
const { GMAIL_API_BASE, LIST_BATCH_SIZE, request: gmailApiRequest } = require('./gmail/gmail-api');
3131

32+
const MAX_GMAIL_BATCH_SIZE = 50;
33+
3234
// Labels to exclude from folder listings
3335
const SKIP_LABELS = ['UNREAD', 'STARRED', 'IMPORTANT', 'CHAT', 'CATEGORY_PERSONAL'];
3436

@@ -1369,7 +1371,7 @@ class GmailClient extends BaseClient {
13691371

13701372
const results = [];
13711373
const settingsBatchSize = await settings.get('gmailExportBatchSize');
1372-
const batchSize = settingsBatchSize || DEFAULT_GMAIL_EXPORT_BATCH_SIZE;
1374+
const batchSize = Math.min(settingsBatchSize || DEFAULT_GMAIL_EXPORT_BATCH_SIZE, MAX_GMAIL_BATCH_SIZE);
13731375

13741376
for (let i = 0; i < emailIds.length; i += batchSize) {
13751377
const batch = emailIds.slice(i, i + batchSize);

lib/export.js

Lines changed: 21 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class Export {
130130

131131
const activeEntry = await tryAddToActiveSet(account, exportId);
132132

133-
await fs.promises.mkdir(exportPath, { recursive: true }).catch(() => {});
133+
await fs.promises.mkdir(exportPath, { recursive: true });
134134

135135
const secret = await getSecret();
136136
const isEncrypted = !!secret;
@@ -221,7 +221,7 @@ class Export {
221221
};
222222

223223
if (data.status === 'failed') {
224-
result.isResumable = data.isResumable === '1';
224+
result.isResumable = false;
225225
}
226226

227227
return result;
@@ -307,7 +307,7 @@ class Export {
307307

308308
const uniqueKey = `${messageInfo.folder || ''}:${messageInfo.messageId || ''}:${messageInfo.uid || ''}`;
309309
const hash = crypto.createHash('sha256').update(uniqueKey).digest();
310-
const tiebreaker = ((hash[0] << 16) | (hash[1] << 8) | hash[2]) % 1000000;
310+
const tiebreaker = (((hash[0] << 16) | (hash[1] << 8) | hash[2]) >>> 0) % 1000000;
311311

312312
const score = baseSeconds * 1000000 + tiebreaker;
313313

@@ -378,111 +378,29 @@ class Export {
378378
const exportKey = getExportKey(account, exportId);
379379
const queueKey = getExportQueueKey(account, exportId);
380380

381-
const exportData = await redis.hgetall(exportKey);
382-
const lastProcessedScore = Number(exportData?.lastProcessedScore) || 0;
383-
const messagesQueued = Number(exportData?.messagesQueued) || 0;
384-
const messagesExported = Number(exportData?.messagesExported) || 0;
385-
386-
const isResumable = lastProcessedScore > 0 && messagesQueued > 0 && messagesExported < messagesQueued;
387-
388-
const multi = redis
389-
.multi()
390-
.hmset(exportKey, {
391-
status: 'failed',
392-
error: error || 'Unknown error',
393-
isResumable: isResumable ? '1' : '0'
394-
})
395-
.srem(ACTIVE_EXPORTS_KEY, `${account}:${exportId}`);
396-
397-
if (!isResumable) {
398-
multi.del(queueKey);
399-
}
400-
401-
await multi.exec();
402-
403-
logger.error({ msg: 'Export failed', account, exportId, error, isResumable });
404-
}
405-
406-
static async resume(account, exportId) {
407-
const exportKey = getExportKey(account, exportId);
408-
const queueKey = getExportQueueKey(account, exportId);
409-
410381
const exportData = await redis.hgetall(exportKey);
411382
if (!exportData || !exportData.exportId) {
412-
throw createError('Export not found', 'ExportNotFound', 404);
413-
}
414-
415-
if (exportData.status !== 'failed') {
416-
throw createError(`Cannot resume export with status "${exportData.status}"`, 'InvalidExportStatus', 400);
383+
// Key already deleted, just clean up active set
384+
await redis.srem(ACTIVE_EXPORTS_KEY, `${account}:${exportId}`);
385+
return;
417386
}
418387

419-
if (exportData.isResumable !== '1') {
420-
throw createError('Export is not resumable. The export must have made progress before failing.', 'ExportNotResumable', 400);
421-
}
422-
423-
const queueSize = await redis.zcard(queueKey);
424-
if (queueSize === 0) {
425-
throw createError('Export queue no longer exists', 'QueueNotFound', 400);
426-
}
427-
428-
const activeEntry = await tryAddToActiveSet(account, exportId);
429-
430-
const maxAge = await getExportMaxAge();
431-
const ttl = Math.ceil(maxAge / 1000);
432-
const now = Date.now();
433-
const newExpiresAt = now + maxAge;
434-
435388
await redis
436389
.multi()
437390
.hmset(exportKey, {
438-
status: 'queued',
439-
phase: 'exporting',
440-
error: '',
441-
isResumable: '0',
442-
expiresAt: newExpiresAt
391+
status: 'failed',
392+
error: error || 'Unknown error',
393+
isResumable: '0'
443394
})
444-
.expire(exportKey, ttl)
445-
.expire(queueKey, ttl)
395+
.del(queueKey)
396+
.srem(ACTIVE_EXPORTS_KEY, `${account}:${exportId}`)
446397
.exec();
447398

448-
try {
449-
await exportQueue.add(
450-
'export',
451-
{ account, exportId, isResumed: true },
452-
{ jobId: `${exportId}-resume-${Date.now()}`, removeOnComplete: true, removeOnFail: true }
453-
);
454-
} catch (err) {
455-
await redis
456-
.multi()
457-
.hmset(exportKey, {
458-
status: 'failed',
459-
error: exportData.error || 'Resume failed',
460-
isResumable: '1'
461-
})
462-
.srem(ACTIVE_EXPORTS_KEY, activeEntry)
463-
.exec();
464-
throw err;
465-
}
466-
467-
logger.info({
468-
msg: 'Export resumed',
469-
account,
470-
exportId,
471-
lastProcessedScore: exportData.lastProcessedScore,
472-
messagesExported: exportData.messagesExported,
473-
messagesQueued: exportData.messagesQueued
474-
});
399+
logger.error({ msg: 'Export failed', account, exportId, error });
400+
}
475401

476-
return {
477-
exportId,
478-
status: 'queued',
479-
resumed: true,
480-
progress: {
481-
messagesExported: Number(exportData.messagesExported) || 0,
482-
messagesQueued: Number(exportData.messagesQueued) || 0,
483-
messagesSkipped: Number(exportData.messagesSkipped) || 0
484-
}
485-
};
402+
static async resume(account, exportId) {
403+
throw createError('Export resume is not currently supported', 'ExportNotResumable', 501);
486404
}
487405

488406
static async markInterruptedAsFailed() {
@@ -499,6 +417,12 @@ class Export {
499417

500418
const data = await redis.hgetall(getExportKey(account, exportId));
501419

420+
if (!data || !data.exportId) {
421+
// Stale entry -- export hash expired, clean up active set
422+
await redis.srem(ACTIVE_EXPORTS_KEY, entry);
423+
continue;
424+
}
425+
502426
if (data && (data.status === 'processing' || data.status === 'queued' || data.status === 'indexing')) {
503427
const job = await exportQueue.getJob(exportId).catch(() => null);
504428
if (job) {

lib/routes-ui.js

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ const {
4646
accountIdSchema,
4747
defaultAccountTypeSchema,
4848
googleProjectIdSchema,
49-
googleWorkspaceAccountsSchema
49+
googleWorkspaceAccountsSchema,
50+
exportIdSchema
5051
} = require('./schemas');
5152
const fs = require('fs');
5253
const pathlib = require('path');
@@ -1012,7 +1013,7 @@ function applyRoutes(server, call) {
10121013
failAction,
10131014
params: Joi.object({
10141015
account: Joi.string().max(256).required(),
1015-
exportId: Joi.string().max(256).required()
1016+
exportId: exportIdSchema
10161017
})
10171018
}
10181019
}
@@ -1030,7 +1031,8 @@ function applyRoutes(server, call) {
10301031
throw Boom.notFound('Export not found');
10311032
}
10321033

1033-
let stream = fs.createReadStream(fileInfo.filePath);
1034+
const fileReadStream = fs.createReadStream(fileInfo.filePath);
1035+
let stream = fileReadStream;
10341036

10351037
stream.on('error', err => {
10361038
request.logger.error({ msg: 'Export download stream error', exportId, err });
@@ -1040,14 +1042,16 @@ function applyRoutes(server, call) {
10401042
if (fileInfo.isEncrypted) {
10411043
const secret = await getSecret();
10421044
if (!secret) {
1045+
fileReadStream.destroy();
10431046
throw Boom.serverUnavailable('Encryption secret not available for decryption');
10441047
}
10451048
const { createDecryptStream } = require('./stream-encrypt');
10461049
const decryptStream = createDecryptStream(secret);
10471050
decryptStream.on('error', err => {
10481051
request.logger.error({ msg: 'Export decryption error', exportId, err });
1052+
fileReadStream.destroy();
10491053
});
1050-
stream = stream.pipe(decryptStream);
1054+
stream = fileReadStream.pipe(decryptStream);
10511055
}
10521056

10531057
return h
@@ -1070,7 +1074,7 @@ function applyRoutes(server, call) {
10701074
failAction,
10711075
params: Joi.object({
10721076
account: Joi.string().max(256).required(),
1073-
exportId: Joi.string().max(256).required()
1077+
exportId: exportIdSchema
10741078
})
10751079
}
10761080
}

lib/stream-encrypt.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ class DecryptStream extends Transform {
134134

135135
const iv = this.buffer.subarray(0, IV_LENGTH);
136136
const encryptedLength = this.buffer.readUInt32LE(IV_LENGTH);
137+
if (encryptedLength > CHUNK_SIZE + 256) {
138+
throw new Error('Invalid encrypted chunk length');
139+
}
137140
const totalChunkSize = minChunkHeader + encryptedLength + AUTH_TAG_LENGTH;
138141

139142
if (this.buffer.length < totalChunkSize) {

lib/webhooks.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ class WebhooksHandler {
514514
async pushToQueue(event, originalPayload, opts = {}) {
515515
// custom webhook routes
516516
let webhookRoutes = await this.getWebhookHandlers();
517-
let queueKeep = (await settings.get('queueKeep')) || true;
517+
let queueKeep = (await settings.get('queueKeep')) ?? true;
518518

519519
let removePolicy = typeof queueKeep === 'number' ? { age: 24 * 3600, count: queueKeep } : queueKeep;
520520
let jobOpts = {

0 commit comments

Comments
 (0)