Skip to content

Commit f559c38

Browse files
committed
fix: improve submit resilience during worker restarts and add batch endpoint
Wait for IMAP worker reassignment before failing submit/queue operations with WorkerNotAvailable, allow 503 errors to be retried by BullMQ instead of discarding, add batch submit endpoint with chunked concurrency and upfront account validation, and remove duplicate logger call in discard error path.
1 parent ca7af4a commit f559c38

File tree

6 files changed

+1368
-11
lines changed

6 files changed

+1368
-11
lines changed

server.js

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1500,6 +1500,39 @@ async function call(worker, message, transferList) {
15001500
});
15011501
}
15021502

1503+
/**
1504+
* Wait for an account to become assigned to an IMAP worker.
1505+
* Used by submitMessage/queueMessage to survive brief worker restarts
1506+
* instead of failing immediately with WorkerNotAvailable.
1507+
* @param {string} account - Account identifier
1508+
* @param {number} [timeout=12000] - Maximum wait time in ms
1509+
* @param {number} [interval=500] - Poll interval in ms
1510+
* @returns {Promise<boolean>} True if account became assigned, false if timed out
1511+
*/
1512+
function waitForAssignment(account, timeout = 12000, interval = 500) {
1513+
return new Promise(resolve => {
1514+
if (assigned.has(account)) {
1515+
return resolve(true);
1516+
}
1517+
if (isClosing) {
1518+
return resolve(false);
1519+
}
1520+
1521+
let elapsed = 0;
1522+
let timer = setInterval(() => {
1523+
elapsed += interval;
1524+
if (assigned.has(account)) {
1525+
clearInterval(timer);
1526+
return resolve(true);
1527+
}
1528+
if (isClosing || elapsed >= timeout) {
1529+
clearInterval(timer);
1530+
return resolve(false);
1531+
}
1532+
}, interval);
1533+
});
1534+
}
1535+
15031536
/**
15041537
* Assign unassigned accounts to available IMAP workers
15051538
* Uses load-aware distribution with round-robin for initial assignment
@@ -2619,6 +2652,26 @@ async function onCommand(worker, message) {
26192652
}
26202653
break;
26212654

2655+
// Submit/queue operations - wait for worker assignment before failing
2656+
case 'submitMessage':
2657+
case 'queueMessage': {
2658+
if (!assigned.has(message.account)) {
2659+
let wasAssigned = await waitForAssignment(message.account);
2660+
if (!wasAssigned) {
2661+
throw NO_ACTIVE_HANDLER_RESP_ERR;
2662+
}
2663+
}
2664+
2665+
let assignedWorker = assigned.get(message.account);
2666+
2667+
let transferList = [];
2668+
if (typeof message.raw === 'object') {
2669+
transferList.push(message.raw);
2670+
}
2671+
2672+
return await call(assignedWorker, message, transferList);
2673+
}
2674+
26222675
// IMAP operations - forward to assigned worker
26232676
case 'listMessages':
26242677
case 'getRawMessage':
@@ -2636,8 +2689,6 @@ async function onCommand(worker, message) {
26362689
case 'createMailbox':
26372690
case 'modifyMailbox':
26382691
case 'deleteMailbox':
2639-
case 'submitMessage':
2640-
case 'queueMessage':
26412692
case 'uploadMessage':
26422693
case 'getAttachment':
26432694
case 'listSignatures': {
@@ -2653,10 +2704,6 @@ async function onCommand(worker, message) {
26532704
transferList.push(message.port);
26542705
}
26552706

2656-
if (['submitMessage', 'queueMessage'].includes(message.cmd) && typeof message.raw === 'object') {
2657-
transferList.push(message.raw);
2658-
}
2659-
26602707
return await call(assignedWorker, message, transferList);
26612708
}
26622709

test/batch-submit-test.js

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
'use strict';
2+
3+
const test = require('node:test');
4+
const assert = require('node:assert').strict;
5+
const Joi = require('joi');
6+
7+
// Reconstruct a minimal version of the batch submit validation schema
8+
// to test Joi validation rules independently of the running server
9+
const MAX_ATTACHMENT_SIZE = 25 * 1024 * 1024;
10+
11+
const addressSchema = Joi.object({
12+
name: Joi.string().max(256),
13+
address: Joi.string().email().required()
14+
});
15+
16+
const batchMessageSchema = Joi.object({
17+
from: Joi.object({
18+
name: Joi.string().max(256),
19+
address: Joi.string().email().required()
20+
}),
21+
22+
to: Joi.array().items(addressSchema).single(),
23+
24+
cc: Joi.array().items(addressSchema).single(),
25+
26+
bcc: Joi.array().items(addressSchema).single(),
27+
28+
subject: Joi.string().max(10240),
29+
text: Joi.string().max(5 * 1024 * 1024),
30+
html: Joi.string().max(5 * 1024 * 1024),
31+
32+
raw: Joi.string().base64().max(MAX_ATTACHMENT_SIZE),
33+
34+
attachments: Joi.array().items(
35+
Joi.object({
36+
filename: Joi.string().max(256),
37+
content: Joi.string().base64().max(MAX_ATTACHMENT_SIZE).required(),
38+
contentType: Joi.string().lowercase().max(256),
39+
encoding: Joi.string().valid('base64').default('base64')
40+
})
41+
),
42+
43+
messageId: Joi.string().max(996),
44+
headers: Joi.object().unknown(),
45+
46+
sendAt: Joi.date().iso(),
47+
deliveryAttempts: Joi.number().integer(),
48+
gateway: Joi.string().max(256)
49+
})
50+
.oxor('raw', 'html')
51+
.oxor('raw', 'text')
52+
.oxor('raw', 'attachments');
53+
54+
const batchPayloadSchema = Joi.object({
55+
messages: Joi.array().items(batchMessageSchema).min(1).max(50).required()
56+
});
57+
58+
// Simulate the batch handler logic
59+
async function simulateBatchHandler(messages, queueMessageFn) {
60+
let results = await Promise.allSettled(
61+
messages.map(async (msg, index) => {
62+
let response = await queueMessageFn(msg, index);
63+
return { index, response };
64+
})
65+
);
66+
67+
let successCount = 0;
68+
let failureCount = 0;
69+
let entries = [];
70+
71+
for (let i = 0; i < results.length; i++) {
72+
let result = results[i];
73+
if (result.status === 'fulfilled') {
74+
successCount++;
75+
entries.push({
76+
index: result.value.index,
77+
success: true,
78+
queueId: result.value.response.queueId || null,
79+
messageId: result.value.response.messageId || null,
80+
sendAt: result.value.response.sendAt || null
81+
});
82+
} else {
83+
failureCount++;
84+
let err = result.reason;
85+
entries.push({
86+
index: i,
87+
success: false,
88+
error: {
89+
message: err.message,
90+
code: err.code || null
91+
}
92+
});
93+
}
94+
}
95+
96+
return {
97+
totalMessages: messages.length,
98+
successCount,
99+
failureCount,
100+
results: entries
101+
};
102+
}
103+
104+
test('Batch submit validation tests', async t => {
105+
await t.test('accepts a valid batch payload', async () => {
106+
let payload = {
107+
messages: [
108+
{
109+
to: [{ address: 'recipient@example.com' }],
110+
subject: 'Test 1',
111+
text: 'Hello 1'
112+
},
113+
{
114+
to: [{ address: 'another@example.com' }],
115+
subject: 'Test 2',
116+
html: '<p>Hello 2</p>'
117+
}
118+
]
119+
};
120+
121+
let result = batchPayloadSchema.validate(payload);
122+
assert.strictEqual(result.error, undefined, 'Should accept valid batch payload');
123+
assert.strictEqual(result.value.messages.length, 2);
124+
});
125+
126+
await t.test('rejects empty messages array', async () => {
127+
let payload = {
128+
messages: []
129+
};
130+
131+
let result = batchPayloadSchema.validate(payload);
132+
assert.ok(result.error, 'Should reject empty messages array');
133+
assert.ok(result.error.message.includes('at least'), 'Error should mention minimum');
134+
});
135+
136+
await t.test('rejects batch exceeding max size of 50', async () => {
137+
let messages = [];
138+
for (let i = 0; i < 51; i++) {
139+
messages.push({
140+
to: [{ address: `recipient${i}@example.com` }],
141+
subject: `Test ${i}`,
142+
text: `Hello ${i}`
143+
});
144+
}
145+
146+
let result = batchPayloadSchema.validate({ messages });
147+
assert.ok(result.error, 'Should reject batch exceeding 50 messages');
148+
assert.ok(result.error.message.includes('50') || result.error.message.includes('less'), 'Error should mention the limit');
149+
});
150+
151+
await t.test('rejects message with both raw and html', async () => {
152+
let payload = {
153+
messages: [
154+
{
155+
to: [{ address: 'recipient@example.com' }],
156+
raw: 'dGVzdA==',
157+
html: '<p>conflict</p>'
158+
}
159+
]
160+
};
161+
162+
let result = batchPayloadSchema.validate(payload);
163+
assert.ok(result.error, 'Should reject message with both raw and html');
164+
});
165+
166+
await t.test('rejects missing messages field', async () => {
167+
let result = batchPayloadSchema.validate({});
168+
assert.ok(result.error, 'Should reject missing messages field');
169+
});
170+
171+
await t.test('accepts batch at exactly max size of 50', async () => {
172+
let messages = [];
173+
for (let i = 0; i < 50; i++) {
174+
messages.push({
175+
to: [{ address: `recipient${i}@example.com` }],
176+
subject: `Test ${i}`,
177+
text: `Hello ${i}`
178+
});
179+
}
180+
181+
let result = batchPayloadSchema.validate({ messages });
182+
assert.strictEqual(result.error, undefined, 'Should accept exactly 50 messages');
183+
});
184+
});
185+
186+
test('Batch submit handler tests', async t => {
187+
await t.test('all messages succeed', async () => {
188+
let messages = [
189+
{ to: [{ address: 'a@example.com' }], subject: 'A', text: 'A' },
190+
{ to: [{ address: 'b@example.com' }], subject: 'B', text: 'B' },
191+
{ to: [{ address: 'c@example.com' }], subject: 'C', text: 'C' }
192+
];
193+
194+
let response = await simulateBatchHandler(messages, async (msg, index) => ({
195+
queueId: `queue-${index}`,
196+
messageId: `<msg-${index}@example.com>`,
197+
sendAt: '2021-07-08T07:06:34.336Z'
198+
}));
199+
200+
assert.strictEqual(response.totalMessages, 3);
201+
assert.strictEqual(response.successCount, 3);
202+
assert.strictEqual(response.failureCount, 0);
203+
assert.strictEqual(response.results.length, 3);
204+
205+
for (let i = 0; i < 3; i++) {
206+
assert.strictEqual(response.results[i].index, i);
207+
assert.strictEqual(response.results[i].success, true);
208+
assert.strictEqual(response.results[i].queueId, `queue-${i}`);
209+
assert.strictEqual(response.results[i].messageId, `<msg-${i}@example.com>`);
210+
}
211+
});
212+
213+
await t.test('partial failure - mix of success and error', async () => {
214+
let messages = [
215+
{ to: [{ address: 'a@example.com' }], subject: 'A', text: 'A' },
216+
{ to: [{ address: 'invalid' }], subject: 'B', text: 'B' },
217+
{ to: [{ address: 'c@example.com' }], subject: 'C', text: 'C' }
218+
];
219+
220+
let response = await simulateBatchHandler(messages, async (msg, index) => {
221+
if (index === 1) {
222+
let err = new Error('Invalid recipient');
223+
err.code = 'InputValidationError';
224+
throw err;
225+
}
226+
return {
227+
queueId: `queue-${index}`,
228+
messageId: `<msg-${index}@example.com>`
229+
};
230+
});
231+
232+
assert.strictEqual(response.totalMessages, 3);
233+
assert.strictEqual(response.successCount, 2);
234+
assert.strictEqual(response.failureCount, 1);
235+
assert.strictEqual(response.results.length, 3);
236+
237+
// First message succeeds
238+
assert.strictEqual(response.results[0].success, true);
239+
assert.strictEqual(response.results[0].queueId, 'queue-0');
240+
241+
// Second message fails
242+
assert.strictEqual(response.results[1].success, false);
243+
assert.strictEqual(response.results[1].error.message, 'Invalid recipient');
244+
assert.strictEqual(response.results[1].error.code, 'InputValidationError');
245+
246+
// Third message succeeds
247+
assert.strictEqual(response.results[2].success, true);
248+
assert.strictEqual(response.results[2].queueId, 'queue-2');
249+
});
250+
251+
await t.test('all messages fail', async () => {
252+
let messages = [
253+
{ to: [{ address: 'a@example.com' }], subject: 'A', text: 'A' },
254+
{ to: [{ address: 'b@example.com' }], subject: 'B', text: 'B' }
255+
];
256+
257+
let response = await simulateBatchHandler(messages, async () => {
258+
let err = new Error('Service unavailable');
259+
err.code = 'WorkerNotAvailable';
260+
throw err;
261+
});
262+
263+
assert.strictEqual(response.totalMessages, 2);
264+
assert.strictEqual(response.successCount, 0);
265+
assert.strictEqual(response.failureCount, 2);
266+
267+
for (let result of response.results) {
268+
assert.strictEqual(result.success, false);
269+
assert.strictEqual(result.error.code, 'WorkerNotAvailable');
270+
}
271+
});
272+
273+
await t.test('handles messages with no queueId or messageId gracefully', async () => {
274+
let messages = [{ to: [{ address: 'a@example.com' }], subject: 'A', text: 'A' }];
275+
276+
let response = await simulateBatchHandler(messages, async () => ({
277+
response: 'Queued for delivery'
278+
}));
279+
280+
assert.strictEqual(response.successCount, 1);
281+
assert.strictEqual(response.results[0].success, true);
282+
assert.strictEqual(response.results[0].queueId, null);
283+
assert.strictEqual(response.results[0].messageId, null);
284+
});
285+
286+
await t.test('handles errors without code property', async () => {
287+
let messages = [{ to: [{ address: 'a@example.com' }], subject: 'A', text: 'A' }];
288+
289+
let response = await simulateBatchHandler(messages, async () => {
290+
throw new Error('Something went wrong');
291+
});
292+
293+
assert.strictEqual(response.failureCount, 1);
294+
assert.strictEqual(response.results[0].success, false);
295+
assert.strictEqual(response.results[0].error.message, 'Something went wrong');
296+
assert.strictEqual(response.results[0].error.code, null);
297+
});
298+
});

0 commit comments

Comments
 (0)