File tree Expand file tree Collapse file tree 2 files changed +19
-3
lines changed
src/test/java/com/rabbitmq/client/amqp Expand file tree Collapse file tree 2 files changed +19
-3
lines changed Original file line number Diff line number Diff line change @@ -104,6 +104,16 @@ void publishAddressFormatInMessages() {
104104 .message ();
105105 }
106106
107+ void publishStreamFiltering () {
108+ Publisher publisher = null ;
109+ byte [] body = null ;
110+ Message message = publisher .message (body )
111+ .annotation ("x-stream-filter-value" , "invoices" );
112+ publisher .publish (message , context -> {
113+ // confirm callback
114+ });
115+ }
116+
107117 void consuming () {
108118 Connection connection = null ;
109119 Consumer consumer = connection .consumerBuilder ()
@@ -147,8 +157,13 @@ void consumingStreamFiltering() {
147157 .filterValues ("invoices" , "orders" )
148158 .filterMatchUnfiltered (true )
149159 .builder ()
150- .messageHandler ((context , message ) -> {
151- // message processing
160+ .messageHandler ((ctx , msg ) -> {
161+ String filterValue = (String ) msg .annotation ("x-stream-filter-value" );
162+ // there must be some client-side filter logic
163+ if ("invoices" .equals (filterValue ) || "orders" .equals (filterValue )) {
164+ // message processing
165+ }
166+ ctx .accept ();
152167 })
153168 .build ();
154169
Original file line number Diff line number Diff line change @@ -326,7 +326,8 @@ void streamFilteringWithClientSideFiltering() {
326326 .messageHandler (
327327 (ctx , msg ) -> {
328328 receivedCount .incrementAndGet ();
329- if (selection .equals (msg .subject ())) {
329+ if (selection .equals (msg .subject ())
330+ && selection .equals (msg .annotation ("x-stream-filter-value" ))) {
330331 selectedMessageCount .incrementAndGet ();
331332 }
332333 ctx .accept ();
You can’t perform that action at this time.
0 commit comments