Skip to content

Commit 3a73704

Browse files
committed
fix: use consistent index source in batch submit success and failure paths
The batch handler used result.value.index (computed as offset+chunkIndex) for successes but the loop counter i for failures. Both now use i, which is correct for sequentially-built allResults and eliminates the inconsistency. Updated the test mirror function to match the chunked processing logic and added a cross-chunk failure test.
1 parent f559c38 commit 3a73704

File tree

2 files changed

+71
-12
lines changed

2 files changed

+71
-12
lines changed

test/batch-submit-test.js

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,25 +55,33 @@ const batchPayloadSchema = Joi.object({
5555
messages: Joi.array().items(batchMessageSchema).min(1).max(50).required()
5656
});
5757

58-
// Simulate the batch handler logic
59-
async function simulateBatchHandler(messages, queueMessageFn) {
60-
let results = await Promise.allSettled(
61-
messages.map(async (msg, index) => {
62-
let response = await queueMessageFn(msg, index);
63-
return { index, response };
64-
})
65-
);
58+
// Simulate the batch handler logic (mirrors workers/api.js chunked processing)
59+
async function simulateBatchHandler(messages, queueMessageFn, batchConcurrency) {
60+
let BATCH_CONCURRENCY = batchConcurrency || 10;
61+
let allResults = [];
62+
63+
for (let offset = 0; offset < messages.length; offset += BATCH_CONCURRENCY) {
64+
let chunk = messages.slice(offset, offset + BATCH_CONCURRENCY);
65+
let chunkResults = await Promise.allSettled(
66+
chunk.map(async (msg, chunkIndex) => {
67+
let globalIndex = offset + chunkIndex;
68+
let response = await queueMessageFn(msg, globalIndex);
69+
return { index: globalIndex, response };
70+
})
71+
);
72+
allResults.push(...chunkResults);
73+
}
6674

6775
let successCount = 0;
6876
let failureCount = 0;
6977
let entries = [];
7078

71-
for (let i = 0; i < results.length; i++) {
72-
let result = results[i];
79+
for (let i = 0; i < allResults.length; i++) {
80+
let result = allResults[i];
7381
if (result.status === 'fulfilled') {
7482
successCount++;
7583
entries.push({
76-
index: result.value.index,
84+
index: i,
7785
success: true,
7886
queueId: result.value.response.queueId || null,
7987
messageId: result.value.response.messageId || null,
@@ -295,4 +303,55 @@ test('Batch submit handler tests', async t => {
295303
assert.strictEqual(response.results[0].error.message, 'Something went wrong');
296304
assert.strictEqual(response.results[0].error.code, null);
297305
});
306+
307+
await t.test('chunked batch failure in second chunk reports correct index', async () => {
308+
// Use batchConcurrency=3 so 7 messages produce 3 chunks: [0,1,2], [3,4,5], [6]
309+
let messages = [];
310+
for (let i = 0; i < 7; i++) {
311+
messages.push({ to: [{ address: `user${i}@example.com` }], subject: `Msg ${i}`, text: `Body ${i}` });
312+
}
313+
314+
// Fail messages at indices 4 (second chunk) and 6 (third chunk)
315+
let response = await simulateBatchHandler(
316+
messages,
317+
async (msg, index) => {
318+
if (index === 4 || index === 6) {
319+
let err = new Error(`Failed at index ${index}`);
320+
err.code = 'TestError';
321+
throw err;
322+
}
323+
return {
324+
queueId: `queue-${index}`,
325+
messageId: `<msg-${index}@example.com>`
326+
};
327+
},
328+
3
329+
);
330+
331+
assert.strictEqual(response.totalMessages, 7);
332+
assert.strictEqual(response.successCount, 5);
333+
assert.strictEqual(response.failureCount, 2);
334+
assert.strictEqual(response.results.length, 7);
335+
336+
// Verify every result has the correct index
337+
for (let i = 0; i < 7; i++) {
338+
assert.strictEqual(response.results[i].index, i, `Result at position ${i} should have index ${i}`);
339+
}
340+
341+
// Verify the failures are at the right positions
342+
assert.strictEqual(response.results[4].success, false);
343+
assert.strictEqual(response.results[4].error.message, 'Failed at index 4');
344+
assert.strictEqual(response.results[4].error.code, 'TestError');
345+
346+
assert.strictEqual(response.results[6].success, false);
347+
assert.strictEqual(response.results[6].error.message, 'Failed at index 6');
348+
349+
// Verify successes have correct queueIds
350+
assert.strictEqual(response.results[0].success, true);
351+
assert.strictEqual(response.results[0].queueId, 'queue-0');
352+
assert.strictEqual(response.results[3].success, true);
353+
assert.strictEqual(response.results[3].queueId, 'queue-3');
354+
assert.strictEqual(response.results[5].success, true);
355+
assert.strictEqual(response.results[5].queueId, 'queue-5');
356+
});
298357
});

workers/api.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3730,7 +3730,7 @@ Include your token in requests using one of these methods:
37303730
if (result.status === 'fulfilled') {
37313731
successCount++;
37323732
entries.push({
3733-
index: result.value.index,
3733+
index: i,
37343734
success: true,
37353735
queueId: result.value.response.queueId || null,
37363736
messageId: result.value.response.messageId || null,

0 commit comments

Comments
 (0)