diff --git a/packages/serverless-offline-sqs/src/sqs.js b/packages/serverless-offline-sqs/src/sqs.js index 82e8aab9..789fe2b8 100644 --- a/packages/serverless-offline-sqs/src/sqs.js +++ b/packages/serverless-offline-sqs/src/sqs.js @@ -82,7 +82,7 @@ class SQS { } async _sqsEvent(functionKey, sqsEvent) { - const {enabled, arn, queueName, batchSize = 10} = sqsEvent; + const {enabled, arn, queueName, batchSize = 10, functionResponseType} = sqsEvent; if (!enabled) return; @@ -109,6 +109,18 @@ class SQS { return getMessages(size - Messages.length, [...messages, ...Messages]); }; + const getSuccessfullyProcessedMessages = (messages, result) => { + if (functionResponseType !== 'ReportBatchItemFailures') { + return messages; + } + + const failedMessageIds = new Set( + result?.batchItemFailures?.map(message => message?.itemIdentifier) ?? [] + ); + + return messages.filter(({MessageId}) => !failedMessageIds.has(MessageId)); + }; + const job = async () => { const messages = await getMessages(batchSize); @@ -119,12 +131,13 @@ class SQS { const event = new SQSEvent(messages, this.region, arn); lambdaFunction.setEvent(event); - await lambdaFunction.runHandler(); + const result = await lambdaFunction.runHandler(); + const messagesToDelete = getSuccessfullyProcessedMessages(messages, result); await Promise.all( chunk( 10, - (messages || []).map(({MessageId: Id, ReceiptHandle}) => ({ + (messagesToDelete || []).map(({MessageId: Id, ReceiptHandle}) => ({ Id, ReceiptHandle }))