|
| 1 | +package io.awspring.cloud.sqs.sample; |
| 2 | + |
| 3 | +import io.awspring.cloud.sqs.annotation.SqsListener; |
| 4 | +import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; |
| 5 | +import io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement; |
| 6 | +import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; |
| 7 | +import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode; |
| 8 | +import io.awspring.cloud.sqs.operations.SqsTemplate; |
| 9 | +import org.slf4j.Logger; |
| 10 | +import org.slf4j.LoggerFactory; |
| 11 | +import org.springframework.boot.ApplicationRunner; |
| 12 | +import org.springframework.context.annotation.Bean; |
| 13 | +import org.springframework.context.annotation.Configuration; |
| 14 | +import org.springframework.messaging.Message; |
| 15 | +import software.amazon.awssdk.services.sqs.SqsAsyncClient; |
| 16 | + |
| 17 | +import java.time.Duration; |
| 18 | +import java.time.OffsetDateTime; |
| 19 | +import java.util.Collection; |
| 20 | +import java.util.UUID; |
| 21 | + |
| 22 | +@Configuration |
| 23 | +public class SqsManualAckSample { |
| 24 | + |
| 25 | + public static final String NEW_USER_QUEUE = "new-user-queue"; |
| 26 | + |
| 27 | + private static final Logger LOGGER = LoggerFactory.getLogger(SqsManualAckSample.class); |
| 28 | + |
| 29 | + @Bean |
| 30 | + public ApplicationRunner sendMessageToQueue(SqsTemplate sqsTemplate) { |
| 31 | + LOGGER.info("Sending message"); |
| 32 | + return args -> sqsTemplate.send(to -> to.queue(NEW_USER_QUEUE) |
| 33 | + .payload(new User(UUID.randomUUID(), "John")) |
| 34 | + ); |
| 35 | + } |
| 36 | + |
| 37 | + @Bean |
| 38 | + public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient) { |
| 39 | + return SqsTemplate.builder() |
| 40 | + .sqsAsyncClient(sqsAsyncClient) |
| 41 | + .build(); |
| 42 | + } |
| 43 | + |
| 44 | + @SqsListener(NEW_USER_QUEUE) |
| 45 | + public void listen(Message<User> message) { |
| 46 | + LOGGER.info("Message received on listen method at {}", OffsetDateTime.now()); |
| 47 | + Acknowledgement.acknowledge(message); |
| 48 | + } |
| 49 | + |
| 50 | + @Bean |
| 51 | + SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { |
| 52 | + return SqsMessageListenerContainerFactory |
| 53 | + .builder() |
| 54 | + .configure(options -> options |
| 55 | + .acknowledgementMode(AcknowledgementMode.MANUAL) |
| 56 | + .acknowledgementInterval(Duration.ofSeconds(3)) // NOTE: With acknowledgementInterval 3 seconds, we can batch and ack async. |
| 57 | + .acknowledgementThreshold(0) |
| 58 | + ) |
| 59 | + .acknowledgementResultCallback(new AckResultCallback()) |
| 60 | + .sqsAsyncClient(sqsAsyncClient) |
| 61 | + .build(); |
| 62 | + } |
| 63 | + |
| 64 | + public record User(UUID id, String name) { |
| 65 | + } |
| 66 | + |
| 67 | + static class AckResultCallback implements AcknowledgementResultCallback<Object> { |
| 68 | + @Override |
| 69 | + public void onSuccess(Collection<Message<Object>> messages) { |
| 70 | + LOGGER.info("Ack with success at {}", OffsetDateTime.now()); } |
| 71 | + |
| 72 | + @Override |
| 73 | + public void onFailure(Collection<Message<Object>> messages, Throwable t) { |
| 74 | + LOGGER.error("Ack with fail", t); |
| 75 | + } |
| 76 | + } |
| 77 | +} |
0 commit comments