Skip to content

Commit 3c14915

Browse files
authored
fix(application-generic): implement bounded retry strategy for SQS message deletion to handle transient errors (#10889)
1 parent bd2c0ef commit 3c14915

1 file changed

Lines changed: 120 additions & 19 deletions

File tree

libs/application-generic/src/services/sqs/sqs-consumer.service.ts

Lines changed: 120 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,56 @@ import {
1616

1717
const LOG_CONTEXT = 'SqsConsumerService';
1818

19+
/**
20+
* Bounded retry config for SQS DeleteMessage calls.
21+
*
22+
* AWS SDK v3's default retry strategy does NOT classify Node DNS errors
23+
* (EAI_AGAIN, ENOTFOUND, etc.) as transient, so a single resolver hiccup
24+
* causes the delete to fail on the first attempt. When that happens after
25+
* a successful processor run, SQS will redeliver the message after the
26+
* visibility timeout, leading to duplicate processing. This bounded retry
27+
* with exponential backoff prevents that for typical sub-second blips.
28+
*/
29+
const DELETE_MAX_ATTEMPTS = 3;
30+
const DELETE_BACKOFF_BASE_MS = 200;
31+
/**
32+
* Proportional jitter added on top of the exponential backoff (0..25%).
33+
* Prevents synchronized retry storms when many in-flight deletes hit the
34+
* same transient failure (e.g. region-wide DNS blip).
35+
*/
36+
const DELETE_BACKOFF_JITTER_FACTOR = 0.25;
37+
38+
/**
39+
* SQS / AWS service error names that indicate a permanent failure - retrying
40+
* cannot succeed, so we should log and stop immediately to avoid burning
41+
* the retry budget and flooding logs.
42+
*
43+
* - ReceiptHandleIsInvalid: receipt handle expired (visibility timeout passed)
44+
* or never valid; the message will be redelivered regardless of what we do.
45+
* - InvalidParameterValue / InvalidAddress: malformed request.
46+
* - AccessDenied / AccessDeniedException: IAM/policy issue, won't self-heal.
47+
*/
48+
const NON_RETRYABLE_DELETE_ERROR_NAMES = new Set([
49+
'ReceiptHandleIsInvalid',
50+
'InvalidParameterValue',
51+
'InvalidAddress',
52+
'AccessDenied',
53+
'AccessDeniedException',
54+
]);
55+
56+
function isNonRetryableDeleteError(error: unknown): boolean {
57+
if (!error || typeof error !== 'object') {
58+
return false;
59+
}
60+
const errorName = (error as { name?: string }).name;
61+
const errorCode = (error as { Code?: string; code?: string }).Code ?? (error as { code?: string }).code;
62+
63+
return (
64+
(typeof errorName === 'string' && NON_RETRYABLE_DELETE_ERROR_NAMES.has(errorName)) ||
65+
(typeof errorCode === 'string' && NON_RETRYABLE_DELETE_ERROR_NAMES.has(errorCode))
66+
);
67+
}
68+
1969
export type SqsMessageProcessor<T = unknown> = (data: T, meta: ISqsMessageMeta) => Promise<void>;
2070

2171
/**
@@ -195,41 +245,92 @@ export class SqsConsumerService {
195245

196246
this.processMessage(message)
197247
.then(async () => {
198-
try {
199-
await this.sqsService.getClient().send(
200-
new DeleteMessageCommand({
201-
QueueUrl: this.queueUrl,
202-
ReceiptHandle: message.ReceiptHandle,
203-
})
204-
);
248+
await this.deleteMessageWithRetry(message, messageId);
249+
})
250+
.catch((error) => {
251+
Logger.error(
252+
{
253+
error: error instanceof Error ? error.message : String(error),
254+
messageId,
255+
topic: this.topic,
256+
},
257+
'SQS message failed, will be retried via visibility timeout',
258+
LOG_CONTEXT
259+
);
260+
})
261+
.finally(() => {
262+
this.pool.release();
263+
});
264+
}
205265

206-
this.logger?.debug({ messageId, topic: this.topic }, 'SQS message processed and deleted');
207-
} catch (deleteError) {
266+
/**
267+
* Delete an SQS message with bounded exponential backoff retry.
268+
*
269+
* The processor has already succeeded by this point - failing to delete
270+
* means SQS will redeliver and we'll do duplicate work. We retry a few
271+
* times to absorb short-lived DNS/network blips before giving up.
272+
*/
273+
private async deleteMessageWithRetry(message: Message, messageId: string): Promise<void> {
274+
for (let attempt = 1; attempt <= DELETE_MAX_ATTEMPTS; attempt++) {
275+
try {
276+
await this.sqsService.getClient().send(
277+
new DeleteMessageCommand({
278+
QueueUrl: this.queueUrl,
279+
ReceiptHandle: message.ReceiptHandle,
280+
})
281+
);
282+
283+
this.logger?.debug(
284+
{ messageId, topic: this.topic, attempt, maxAttempts: DELETE_MAX_ATTEMPTS },
285+
'SQS message processed and deleted'
286+
);
287+
288+
return;
289+
} catch (deleteError) {
290+
const errorMessage = deleteError instanceof Error ? deleteError.message : String(deleteError);
291+
const errorName = deleteError instanceof Error ? deleteError.name : undefined;
292+
const isFinalAttempt = attempt === DELETE_MAX_ATTEMPTS;
293+
const isNonRetryable = isNonRetryableDeleteError(deleteError);
294+
295+
if (isNonRetryable || isFinalAttempt) {
208296
Logger.error(
209297
{
210-
error: deleteError instanceof Error ? deleteError.message : String(deleteError),
298+
error: errorMessage,
299+
errorName,
211300
messageId,
212301
topic: this.topic,
302+
attempt,
303+
maxAttempts: DELETE_MAX_ATTEMPTS,
304+
nonRetryable: isNonRetryable,
213305
},
214306
'Failed to delete SQS message after successful processing',
215307
LOG_CONTEXT
216308
);
309+
310+
return;
217311
}
218-
})
219-
.catch((error) => {
220-
Logger.error(
312+
313+
Logger.warn(
221314
{
222-
error: error instanceof Error ? error.message : String(error),
315+
error: errorMessage,
316+
errorName,
223317
messageId,
224318
topic: this.topic,
319+
attempt,
320+
maxAttempts: DELETE_MAX_ATTEMPTS,
225321
},
226-
'SQS message failed, will be retried via visibility timeout',
322+
'Transient error deleting SQS message, retrying',
227323
LOG_CONTEXT
228324
);
229-
})
230-
.finally(() => {
231-
this.pool.release();
232-
});
325+
326+
const baseBackoffMs = DELETE_BACKOFF_BASE_MS * 2 ** (attempt - 1);
327+
const jitterMs = Math.floor(Math.random() * baseBackoffMs * DELETE_BACKOFF_JITTER_FACTOR);
328+
const backoffMs = baseBackoffMs + jitterMs;
329+
await new Promise<void>((resolve) => {
330+
setTimeout(resolve, backoffMs);
331+
});
332+
}
333+
}
233334
}
234335

235336
private async processMessage(message: Message): Promise<void> {

0 commit comments

Comments
 (0)