Skip to content

Commit 1c2556a

Browse files
Add ExponentialBackoffErrorHandlerWithFullJitter, ExponentialBackoffErrorHandlerWithHalfJitter, LinearBackoffErrorHandler to manage message visibility timeouts and implement exponential backoff logic for retries in SQS (#1314) (#1422)
1 parent a4e9d02 commit 1c2556a

12 files changed

+1834
-32
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,6 +1396,181 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
13961396
}
13971397
----
13981398

1399+
==== Exponential Backoff Full Jitter Error Handler
1400+
This error handler applies an exponential backoff strategy with *full jitter*
1401+
when retrying failed message processing. After calculating the exponential
1402+
visibility timeout using the `ApproximateReceiveCount` message attribute, a
1403+
random value between zero and the computed timeout is selected. The selected
1404+
value becomes the new visibility timeout, spreading retries and helping to
1405+
avoid spikes caused by synchronized retries.
1406+
1407+
[cols="2,3,1,1"]
1408+
|===
1409+
| Name | Description | Required | Default
1410+
| `initialVisibilityTimeoutSeconds` | Starting visibility timeout (in seconds)
1411+
used on the first receive attempt. | No | 100
1412+
| `multiplier` | Factor applied to the visibility timeout after each retry. A
1413+
value greater than 1 increases the delay exponentially. | No | 2.0
1414+
| `maxVisibilityTimeoutSeconds` | Maximum allowed visibility timeout in seconds.
1415+
| No | 43200
1416+
| `randomSupplier` | Supplier for the random value used to calculate the
1417+
jitter. | No | `ThreadLocalRandom::current`
1418+
|===
1419+
1420+
NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours).
1421+
If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds
1422+
this limit, an `IllegalArgumentException` will be thrown.
1423+
1424+
When using auto-configured factory, simply declare a `@Bean` and the error
1425+
handler will be set
1426+
1427+
[source, java]
1428+
----
1429+
@Bean
1430+
public ExponentialBackoffErrorHandlerWithFullJitter<Object> asyncErrorHandler() {
1431+
return ExponentialBackoffErrorHandlerWithFullJitter
1432+
.builder()
1433+
.initialVisibilityTimeoutSeconds(1)
1434+
.multiplier(2)
1435+
.maxVisibilityTimeoutSeconds(10)
1436+
.build();
1437+
}
1438+
----
1439+
1440+
Alternatively, `ExponentialBackoffErrorHandlerWithFullJitter` can be set in the
1441+
`MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:
1442+
1443+
[source, java]
1444+
----
1445+
@Bean
1446+
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
1447+
return SqsMessageListenerContainerFactory
1448+
.builder()
1449+
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
1450+
.errorHandler(ExponentialBackoffErrorHandlerWithFullJitter
1451+
.builder()
1452+
.initialVisibilityTimeoutSeconds(1)
1453+
.multiplier(2)
1454+
.maxVisibilityTimeoutSeconds(10)
1455+
.build())
1456+
.build();
1457+
}
1458+
----
1459+
1460+
==== Exponential Backoff Half Jitter Error Handler
1461+
This variant also computes the visibility timeout exponentially but applies
1462+
*half jitter*. The exponential delay is halved and a random value between zero
1463+
and this half is added to it. The resulting timeout is then used to change the
1464+
message visibility.
1465+
1466+
[cols="2,3,1,1"]
1467+
|===
1468+
| Name | Description | Required | Default
1469+
| `initialVisibilityTimeoutSeconds` | Starting visibility timeout (in seconds)
1470+
used on the first receive attempt. | No | 100
1471+
| `multiplier` | Factor applied to the visibility timeout after each retry. A
1472+
value greater than 1 increases the delay exponentially. | No | 2.0
1473+
| `maxVisibilityTimeoutSeconds` | Maximum allowed visibility timeout in seconds.
1474+
| No | 43200
1475+
| `randomSupplier` | Supplier for the random value used to calculate the
1476+
jitter. | No | `ThreadLocalRandom::current`
1477+
|===
1478+
1479+
NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours).
1480+
If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds
1481+
this limit, an `IllegalArgumentException` will be thrown.
1482+
1483+
When using auto-configured factory, simply declare a `@Bean` and the error
1484+
handler will be set
1485+
1486+
[source, java]
1487+
----
1488+
@Bean
1489+
public ExponentialBackoffErrorHandlerWithHalfJitter<Object> asyncErrorHandler() {
1490+
return ExponentialBackoffErrorHandlerWithHalfJitter
1491+
.builder()
1492+
.initialVisibilityTimeoutSeconds(1)
1493+
.multiplier(2)
1494+
.maxVisibilityTimeoutSeconds(10)
1495+
.build();
1496+
}
1497+
----
1498+
1499+
Alternatively, `ExponentialBackoffErrorHandlerWithHalfJitter` can be set in the
1500+
`MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:
1501+
1502+
[source, java]
1503+
----
1504+
@Bean
1505+
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
1506+
return SqsMessageListenerContainerFactory
1507+
.builder()
1508+
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
1509+
.errorHandler(ExponentialBackoffErrorHandlerWithHalfJitter
1510+
.builder()
1511+
.initialVisibilityTimeoutSeconds(1)
1512+
.multiplier(2)
1513+
.maxVisibilityTimeoutSeconds(10)
1514+
.build())
1515+
.build();
1516+
}
1517+
----
1518+
1519+
==== Linear Backoff Error Handler
1520+
`LinearBackoffErrorHandler` increases the visibility timeout linearly whenever a
1521+
message processing attempt fails. Instead of exponential growth, the timeout is
1522+
incremented by a fixed value on each retry until the maximum is reached.
1523+
1524+
[cols="2,3,1,1"]
1525+
|===
1526+
| Name | Description | Required | Default
1527+
| `initialVisibilityTimeoutSeconds` | Initial visibility timeout (in seconds) for
1528+
the first processing attempt. | No | 100
1529+
| `increment` | Amount of seconds added to the visibility timeout after each
1530+
retry. | No | 2
1531+
| `maxVisibilityTimeoutSeconds` | Maximum allowed visibility timeout in seconds.
1532+
| No | 43200
1533+
|===
1534+
1535+
NOTE: The maximum visibility timeout allowed by SQS is 43200 seconds (12 hours).
1536+
If the value provided to the `maxVisibilityTimeoutSeconds` parameter exceeds
1537+
this limit, an `IllegalArgumentException` will be thrown.
1538+
1539+
When using auto-configured factory, simply declare a `@Bean` and the error
1540+
handler will be set
1541+
1542+
[source, java]
1543+
----
1544+
@Bean
1545+
public LinearBackoffErrorHandler<Object> asyncErrorHandler() {
1546+
return LinearBackoffErrorHandler
1547+
.builder()
1548+
.initialVisibilityTimeoutSeconds(1)
1549+
.increment(2)
1550+
.maxVisibilityTimeoutSeconds(10)
1551+
.build();
1552+
}
1553+
----
1554+
1555+
Alternatively, `LinearBackoffErrorHandler` can be set in the
1556+
`MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:
1557+
1558+
[source, java]
1559+
----
1560+
@Bean
1561+
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
1562+
return SqsMessageListenerContainerFactory
1563+
.builder()
1564+
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
1565+
.errorHandler(LinearBackoffErrorHandler
1566+
.builder()
1567+
.initialVisibilityTimeoutSeconds(1)
1568+
.increment(2)
1569+
.maxVisibilityTimeoutSeconds(10)
1570+
.build())
1571+
.build();
1572+
}
1573+
----
13991574
=== Message Conversion and Payload Deserialization
14001575

14011576
Payloads are automatically deserialized from `JSON` for `@SqsListener` annotated methods using a `MappingJackson2MessageConverter`.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener.errorhandler;
17+
18+
import io.awspring.cloud.sqs.listener.Visibility;
19+
20+
/**
21+
* Constants for Backoff Error Handler.
22+
*
23+
* @author Bruno Garcia
24+
* @author Rafael Pavarini
25+
*/
26+
public class BackoffVisibilityConstants {
27+
/**
28+
* The default initial visibility timeout.
29+
*/
30+
static final int DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS = 100;
31+
32+
/**
33+
* The default multiplier, which doubles the visibility timeout.
34+
*/
35+
static final double DEFAULT_MULTIPLIER = 2.0;
36+
/**
37+
* The default increment used by {@link LinearBackoffErrorHandler}.
38+
*/
39+
static final int DEFAULT_INCREMENT = 2;
40+
41+
static final int DEFAULT_MAX_VISIBILITY_TIMEOUT_SECONDS = Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS;
42+
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ErrorHandlerVisibilityHelper.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.stream.Collectors;
2828
import org.springframework.messaging.Message;
29+
import org.springframework.util.Assert;
2930

3031
/**
3132
* Utility methods for Error Handler.
@@ -58,4 +59,26 @@ public static <T> long getReceiveMessageCount(Message<T> message) {
5859
return Long.parseLong(MessageHeaderUtils.getHeaderAsString(message,
5960
SqsHeaders.MessageSystemAttributes.SQS_APPROXIMATE_RECEIVE_COUNT));
6061
}
62+
63+
public static int calculateVisibilityTimeoutExponentially(long receiveMessageCount,
64+
int initialVisibilityTimeoutSeconds, double multiplier, int maxVisibilityTimeoutSeconds) {
65+
double timeout = initialVisibilityTimeoutSeconds * Math.pow(multiplier, receiveMessageCount - 1);
66+
int capped = (int) Math.min(timeout, (long) Integer.MAX_VALUE);
67+
return Math.min(capped, maxVisibilityTimeoutSeconds);
68+
}
69+
70+
public static int calculateVisibilityTimeoutLinearly(long receiveMessageCount, int initialVisibilityTimeoutSeconds,
71+
int increment, int maxVisibilityTimeoutSeconds) {
72+
long timeout = initialVisibilityTimeoutSeconds + increment * (receiveMessageCount - 1);
73+
int capped = (int) Math.min(timeout, Integer.MAX_VALUE);
74+
return Math.min(capped, maxVisibilityTimeoutSeconds);
75+
}
76+
77+
public static void checkVisibilityTimeout(long visibilityTimeout) {
78+
Assert.isTrue(visibilityTimeout > 0,
79+
() -> "Invalid visibility timeout '" + visibilityTimeout + "'. Should be greater than 0 ");
80+
Assert.isTrue(visibilityTimeout <= Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS,
81+
() -> "Invalid visibility timeout '" + visibilityTimeout + "'. Should be less than or equal to "
82+
+ Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS);
83+
}
6184
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/errorhandler/ExponentialBackoffErrorHandler.java

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,8 @@ private int calculateTimeout(Message<T> message) {
104104
}
105105

106106
private int calculateTimeout(long receiveMessageCount) {
107-
double exponential = initialVisibilityTimeoutSeconds * Math.pow(multiplier, receiveMessageCount - 1);
108-
int seconds = (int) Math.min(exponential, (long) Integer.MAX_VALUE);
109-
return Math.min(seconds, maxVisibilityTimeoutSeconds);
107+
return ErrorHandlerVisibilityHelper.calculateVisibilityTimeoutExponentially(receiveMessageCount,
108+
initialVisibilityTimeoutSeconds, multiplier, maxVisibilityTimeoutSeconds);
110109
}
111110

112111
public static <T> Builder<T> builder() {
@@ -115,22 +114,12 @@ public static <T> Builder<T> builder() {
115114

116115
public static class Builder<T> {
117116

118-
/**
119-
* The default initial visibility timeout.
120-
*/
121-
private static final int DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS = 100;
122-
123-
/**
124-
* The default multiplier, which doubles the visibility timeout.
125-
*/
126-
private static final double DEFAULT_MULTIPLIER = 2.0;
127-
128-
private int initialVisibilityTimeoutSeconds = DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS;
129-
private double multiplier = DEFAULT_MULTIPLIER;
130-
private int maxVisibilityTimeoutSeconds = Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS;
117+
private int initialVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_INITIAL_VISIBILITY_TIMEOUT_SECONDS;
118+
private double multiplier = BackoffVisibilityConstants.DEFAULT_MULTIPLIER;
119+
private int maxVisibilityTimeoutSeconds = BackoffVisibilityConstants.DEFAULT_MAX_VISIBILITY_TIMEOUT_SECONDS;
131120

132121
public Builder<T> initialVisibilityTimeoutSeconds(int initialVisibilityTimeoutSeconds) {
133-
checkVisibilityTimeout(initialVisibilityTimeoutSeconds);
122+
ErrorHandlerVisibilityHelper.checkVisibilityTimeout(initialVisibilityTimeoutSeconds);
134123
this.initialVisibilityTimeoutSeconds = initialVisibilityTimeoutSeconds;
135124
return this;
136125
}
@@ -143,24 +132,16 @@ public Builder<T> multiplier(double multiplier) {
143132
}
144133

145134
public Builder<T> maxVisibilityTimeoutSeconds(int maxVisibilityTimeoutSeconds) {
146-
checkVisibilityTimeout(maxVisibilityTimeoutSeconds);
135+
ErrorHandlerVisibilityHelper.checkVisibilityTimeout(maxVisibilityTimeoutSeconds);
147136
this.maxVisibilityTimeoutSeconds = maxVisibilityTimeoutSeconds;
148137
return this;
149138
}
150139

151140
public ExponentialBackoffErrorHandler<T> build() {
152141
Assert.isTrue(initialVisibilityTimeoutSeconds <= maxVisibilityTimeoutSeconds,
153142
"Initial visibility timeout must not exceed max visibility timeout");
154-
return new ExponentialBackoffErrorHandler<T>(initialVisibilityTimeoutSeconds, multiplier,
143+
return new ExponentialBackoffErrorHandler<>(initialVisibilityTimeoutSeconds, multiplier,
155144
maxVisibilityTimeoutSeconds);
156145
}
157-
158-
private void checkVisibilityTimeout(long visibilityTimeout) {
159-
Assert.isTrue(visibilityTimeout > 0,
160-
() -> "Invalid visibility timeout '" + visibilityTimeout + "'. Should be greater than 0 ");
161-
Assert.isTrue(visibilityTimeout <= Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS,
162-
() -> "Invalid visibility timeout '" + visibilityTimeout + "'. Should be less than or equal to "
163-
+ Visibility.MAX_VISIBILITY_TIMEOUT_SECONDS);
164-
}
165146
}
166147
}

0 commit comments

Comments
 (0)