-
-
Notifications
You must be signed in to change notification settings - Fork 338
Add FilteringAdapter in SQS #1388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
# Conflicts: # spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java
...-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java
Outdated
Show resolved
Hide resolved
...-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java
Outdated
Show resolved
Hide resolved
...d-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java
Outdated
Show resolved
Hide resolved
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AsyncComponentAdapters.java
Outdated
Show resolved
Hide resolved
Hey @imsosleepy tnx on making the PR! Will check PR next week since I am on the conference this week and Tomaz is on vacation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @imsosleepy,
Tnx on PR to me this looks good.
I left 2 comments neat picks nothing big and have few questions.
I would like to see more in terms of filter, this can be done later or in this PR. Firstly what I would like to ask if you are okey on working on this in this PR to extend it or we tackle this in new PR? :)
Second @tomazfernandes please check if you agree with it.
-
I think we should support something like this.
@SqsListener(filterMethod = "filterMethodBean"
) -
@tomazfernandes just to make sure since this is happening inside container and we are actually not allowing messageListener impl call and ack will be autoamatic in default case?
...-sqs/src/main/java/io/awspring/cloud/sqs/config/AbstractMessageListenerContainerFactory.java
Show resolved
Hide resolved
@Override | ||
public CompletableFuture<Void> onMessage(Collection<Message<T>> messages) { | ||
Collection<Message<T>> filteredMessages = messages.stream() | ||
.filter(filter::process) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat pick again but please switch to for. Since size will be always maximum 10 lets switch to for loop to take benefit of performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @imsosleepy for the changes and @MatejNedic for looking into it!
The MessageFilter
interface looks good.
I'm not sureabout filtering messages at the message listener level since we'd still have the message go through the interceptors and error handlers.
I believe it would make more sense to filter it at the Sink level, so the message never gets emitted and doesn't go through the other components, we just acknowledge it.
What do you folks think?
@tomazfernandes I agree makes more sense to me as well. Abstraction What about Are you sure message will go through error handler if it ends without exception in filter tho? |
There's nothing really you can do through the annotation that you can't do with the For this particular case, I would suggest we stick with the What do you think, makes sense? |
Yeah, if it doesn't return an error it won't go through the error handler, but if it does return an error, we'll now have a message we don't want to process in the error handler, or perhaps no message at all which would likely trigger another error. If we prevent the message from entering the processing pipeline altogether I believe it'll be simpler to manage the outcomes. |
Thanks so much for the detailed feedback @tomazfernandes and @MatejNedic! I agree that filtering at the Sink level makes a lot more sense and helps avoid unnecessary processing through interceptors and error handlers. That said, I’m still wrapping my head around the architecture around MessageSink and how exactly to plug in filtering at that level. If you have a more concrete suggestion or example of where the filtering logic should live and how it should be wired into the container, that would help me move faster and avoid misalignment. I’ll dig a bit deeper on my end in the meantime. Thanks again for the guidance — really appreciate it! |
As suggested, I’ve moved the message filtering logic to the MessageSink layer to avoid unnecessary processing through interceptors and error handlers. Before proceeding with additional test for batch and FIFO modes, I’d appreciate a review of the current structure to ensure we're aligned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM flexible easy to use I like it!
Hey @tomazfernandes can you check this PR when you get time? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @imsosleepy @MatejNedic, I apologize for the delay.
I left a few comments, let me know your thoughts!
import org.springframework.messaging.Message; | ||
|
||
public interface MessageFilter<T> { | ||
boolean process(Message<T> message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second thought, I think the interface should receive and return a Collection
of messages rather than a single one.
Something like:
public interface MessageFilter<T> {
Collection<Message<T>> filter(Collection<Message<T>> messages);
}
The reason is that if the filter includes some I/O (e.g. querying the DB to check if the users exist, or making an http request), that'll be more performant as a batch query than in a for loop, and if the user is receiving e.g. 500 messages in a batch that can make a sizeable difference.
If the user wants to filter individual messages, it's simple enough for them to iterate on the collection themselves.
What do you folks think?
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
public class FilteredBatchMessageSink<T> extends BatchMessageSink<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MessageSink
is a composable component, so we should favor composition over inheritance.
In practical terms, it can extend from AbstractDelegatingMessageListeningSinkAdapter
instead - see MessageGroupingSinkAdapter
for an example and how it's assembled in the ContainerFactory
.
Makes sense?
|
||
@Override | ||
protected CompletableFuture<Void> doEmit(Collection<Message<T>> messages, MessageProcessingContext<T> context) { | ||
List<Message<T>> filtered = new ArrayList<>(messages.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of the code needs to be non-blocking, otherwise it may block the polling thread e.g. if the filter makes a request or DB query.
I'm not 100% sure what's the best approach, but we could:
- Add the filtering logic in the
execute
method inAbstractMessageProcessingPipelineSink
in a way that filtering happens in a separate thread - Make the filtering interface non-blocking (
CompletableFuture
-based)
I would probably go with the first approach.
For that, we'd probably need to have the MessageFilter
in the AbstractMessageProcessingPipelineSink
directly.
Let me know your thoughts.
@tomazfernandes Thanks for the suggestions — I agree with the points you’ve raised on composition over inheritance and non-blocking filtering. I’ll work on these changes over the weekend and share the results. The non-blocking filtering part in the BatchMessageSink area might need a bit more input from you as I work through the details. |
…intainer feedback + add integration tests
@tomazfernandes I’d like to get feedback on whether my current approach makes sense, although FIFO is not yet working.
In summary, the main question is whether the filter should be passed down to I’d appreciate any advice from others. |
📢 Type of change
📜 Description
Added support for filtering SQS messages before they reach the listener using a MessageFilter.
The filter can be configured through SqsMessageListenerContainerFactory, and only messages that match the condition will be passed to the listener.
💡 Motivation and Context
Previously, all incoming messages were forwarded to the listener, requiring manual filtering within business logic.
This change enables declarative filtering at the container level, helping reduce unnecessary processing and keeping listener logic clean.
💚 How did you test it?
Added integration test SqsMessageFilterIntegrationTests to verify that filtered messages are not delivered to the listener.
Confirmed that only messages satisfying the filter are passed through and acknowledged.
📝 Checklist
🔮 Next steps
This initial implementation supports filtering on a single message basis only, and does not yet account for batch or asynchronous filtering scenarios.
As suggested in #1373, we plan to extend support to:
(e.g., message attributes, custom headers, or system metadata)