File tree Expand file tree Collapse file tree 3 files changed +39
-1
lines changed Expand file tree Collapse file tree 3 files changed +39
-1
lines changed File renamed without changes.
Original file line number Diff line number Diff line change @@ -92,6 +92,7 @@ def main() -> None:
9292
9393 stream_filter_options = StreamFilterOptions ()
9494 # can be first, last, next or an offset long
95+ # you can also specify stream filters
9596 stream_filter_options .offset (OffsetSpecification .first )
9697
9798 consumer = consumer_connection .consumer (
@@ -118,7 +119,7 @@ def main() -> None:
118119
119120 #
120121 print ("delete queue" )
121- # management.delete_queue(queue_name)
122+ management .delete_queue (queue_name )
122123
123124 print ("closing connections" )
124125 management .close ()
Original file line number Diff line number Diff line change @@ -266,3 +266,40 @@ def test_stream_filtering_not_present(connection: Connection) -> None:
266266 management .delete_queue (stream_name )
267267
268268 assert raised is True
269+
270+
271+ def test_stream_match_unfiltered (connection : Connection ) -> None :
272+
273+ consumer = None
274+ stream_name = "test_stream_info_with_filtering"
275+ messages_to_send = 10
276+
277+ queue_specification = StreamSpecification (
278+ name = stream_name ,
279+ )
280+ management = connection .management ()
281+ management .declare_queue (queue_specification )
282+
283+ addr_queue = AddressHelper .queue_address (stream_name )
284+
285+ # consume and then publish
286+ try :
287+ stream_filter_options = StreamFilterOptions ()
288+ stream_filter_options .apply_filters (["banana" ])
289+ stream_filter_options .filter_match_unfiltered (True )
290+ connection_consumer = create_connection ()
291+ consumer = connection_consumer .consumer (
292+ addr_queue ,
293+ handler = MyMessageHandlerAcceptStreamOffset (),
294+ stream_filter_options = stream_filter_options ,
295+ )
296+ # send with annotations filter banana
297+ publish_messages (connection , messages_to_send , stream_name )
298+ consumer .run ()
299+ # ack to terminate the consumer
300+ except ConsumerTestException :
301+ pass
302+
303+ consumer .close ()
304+
305+ management .delete_queue (stream_name )
You can’t perform that action at this time.
0 commit comments