@@ -231,6 +231,45 @@ def test_stream_filtering(connection: Connection) -> None:
231231 management .delete_queue (stream_name )
232232
233233
234+ def test_stream_filtering_mixed (connection : Connection ) -> None :
235+
236+ consumer = None
237+ stream_name = "test_stream_info_with_filtering"
238+ messages_to_send = 10
239+
240+ queue_specification = StreamSpecification (
241+ name = stream_name ,
242+ )
243+ management = connection .management ()
244+ management .declare_queue (queue_specification )
245+
246+ addr_queue = AddressHelper .queue_address (stream_name )
247+
248+ # consume and then publish
249+ try :
250+ stream_filter_options = StreamOptions ()
251+ stream_filter_options .filter_values (["banana" ])
252+ connection_consumer = create_connection ()
253+ consumer = connection_consumer .consumer (
254+ addr_queue ,
255+ # check we are reading just from offset 10 as just banana filtering applies
256+ message_handler = MyMessageHandlerAcceptStreamOffset (10 ),
257+ stream_filter_options = stream_filter_options ,
258+ )
259+ # send with annotations filter apple and then banana
260+ # consumer will read just from offset 10
261+ publish_messages (connection , messages_to_send , stream_name , ["apple" ])
262+ publish_messages (connection , messages_to_send , stream_name , ["banana" ])
263+ consumer .run ()
264+ # ack to terminate the consumer
265+ except ConsumerTestException :
266+ pass
267+
268+ consumer .close ()
269+
270+ management .delete_queue (stream_name )
271+
272+
234273def test_stream_filtering_not_present (connection : Connection ) -> None :
235274
236275 raised = False
0 commit comments