@@ -261,6 +261,45 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N
261261 management .delete_queue (stream_name )
262262
263263
264+ def test_stream_filtering_mixed (connection : Connection ) -> None :
265+
266+ consumer = None
267+ stream_name = "test_stream_info_with_filtering"
268+ messages_to_send = 10
269+
270+ queue_specification = StreamSpecification (
271+ name = stream_name ,
272+ )
273+ management = connection .management ()
274+ management .declare_queue (queue_specification )
275+
276+ addr_queue = AddressHelper .queue_address (stream_name )
277+
278+ # consume and then publish
279+ try :
280+ stream_filter_options = StreamOptions ()
281+ stream_filter_options .filter_values (["banana" ])
282+ connection_consumer = create_connection ()
283+ consumer = connection_consumer .consumer (
284+ addr_queue ,
285+ # check we are reading just from offset 10 as just banana filtering applies
286+ message_handler = MyMessageHandlerAcceptStreamOffset (10 ),
287+ stream_filter_options = stream_filter_options ,
288+ )
289+ # send with annotations filter apple and then banana
290+ # consumer will read just from offset 10
291+ publish_messages (connection , messages_to_send , stream_name , ["apple" ])
292+ publish_messages (connection , messages_to_send , stream_name , ["banana" ])
293+ consumer .run ()
294+ # ack to terminate the consumer
295+ except ConsumerTestException :
296+ pass
297+
298+ consumer .close ()
299+
300+ management .delete_queue (stream_name )
301+
302+
264303def test_stream_filtering_not_present (
265304 connection : Connection , environment : Environment
266305) -> None :
0 commit comments