-
-
Notifications
You must be signed in to change notification settings - Fork 338
Add FilteringAdapter in SQS #1388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 11 commits
89084dd
2f0917f
2408678
a353fc4
ace28e9
8d0e377
cae2dab
3ea1f5c
708de95
d5cecc7
dcb93e9
37e7ee7
1f97ca4
5b8099a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,8 @@ | |
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionException; | ||
import java.util.function.Supplier; | ||
|
||
import io.awspring.cloud.sqs.support.filter.MessageFilter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.core.task.TaskExecutor; | ||
|
@@ -76,6 +78,19 @@ public static <T> AsyncMessageListener<T> adapt(MessageListener<T> messageListen | |
return new BlockingMessageListenerAdapter<>(messageListener); | ||
} | ||
|
||
/** | ||
* Adapt the provided {@link MessageListener} and {@link MessageFilter} into a single {@link AsyncMessageListener} | ||
* that only forwards messages passing the filter. | ||
* | ||
* @param messageListener the message listener to be adapted | ||
* @param messageFilter the filter used to evaluate incoming messages | ||
* @param <T> the message payload type | ||
* @return the adapted and filtered async message listener | ||
*/ | ||
public static <T> AsyncMessageListener<T> adaptFilter(MessageListener<T> messageListener, MessageFilter<T> messageFilter) { | ||
imsosleepy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return new FilteredMessageListenerAdapter<>(messageListener, messageFilter); | ||
} | ||
|
||
public static <T> AsyncAcknowledgementResultCallback<T> adapt( | ||
AcknowledgementResultCallback<T> acknowledgementResultCallback) { | ||
return new BlockingAcknowledgementResultCallbackAdapter<>(acknowledgementResultCallback); | ||
|
@@ -214,6 +229,42 @@ public CompletableFuture<Void> onMessage(Collection<Message<T>> messages) { | |
} | ||
} | ||
|
||
private static class FilteredMessageListenerAdapter<T> extends AbstractThreadingComponentAdapter | ||
implements AsyncMessageListener<T> { | ||
|
||
private final MessageListener<T> filteredMessageListener; | ||
private final MessageFilter<T> filter; | ||
|
||
public FilteredMessageListenerAdapter(MessageListener<T> filteredMessageListener, MessageFilter<T> filter) { | ||
this.filteredMessageListener = filteredMessageListener; | ||
this.filter = filter; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Void> onMessage(Message<T> message) { | ||
if (filter.process(message)) { | ||
return execute(() -> this.filteredMessageListener.onMessage(message)); | ||
} | ||
else { | ||
return CompletableFuture.completedFuture(null); | ||
} | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Void> onMessage(Collection<Message<T>> messages) { | ||
Collection<Message<T>> filteredMessages = messages.stream() | ||
.filter(filter::process) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Neat pick again but please switch to for. Since size will be always maximum 10 lets switch to for loop to take benefit of performance. |
||
.toList(); | ||
|
||
if (filteredMessages.isEmpty()) { | ||
return CompletableFuture.completedFuture(null); | ||
} | ||
|
||
return execute(() -> this.filteredMessageListener.onMessage(filteredMessages)); | ||
} | ||
} | ||
|
||
|
||
private static class BlockingErrorHandlerAdapter<T> extends AbstractThreadingComponentAdapter | ||
implements AsyncErrorHandler<T> { | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package io.awspring.cloud.sqs.support.filter; | ||
|
||
import org.springframework.messaging.Message; | ||
|
||
public class DefaultMessageFilter<T> implements MessageFilter<T> { | ||
@Override | ||
public boolean process(Message<T> message) { | ||
return true; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package io.awspring.cloud.sqs.support.filter; | ||
|
||
import org.springframework.messaging.Message; | ||
|
||
public interface MessageFilter<T> { | ||
boolean process(Message<T> message); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On a second thought, I think the interface should receive and return a Something like: public interface MessageFilter<T> {
Collection<Message<T>> filter(Collection<Message<T>> messages);
} The reason is that if the filter includes some I/O (e.g. querying the DB to check if the users exist, or making an http request), that'll be more performant as a batch query than in a for loop, and if the user is receiving e.g. 500 messages in a batch that can make a sizeable difference. If the user wants to filter individual messages, it's simple enough for them to iterate on the collection themselves. What do you folks think? |
||
} |
Uh oh!
There was an error while loading. Please reload this page.