@@ -261,6 +261,50 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N
261261 management .delete_queue (stream_name )
262262
263263
264+ def test_stream_filtering_mixed (
265+ connection : Connection , environment : Environment
266+ ) -> None :
267+
268+ consumer = None
269+ stream_name = "test_stream_info_with_filtering"
270+ messages_to_send = 10
271+
272+ queue_specification = StreamSpecification (
273+ name = stream_name ,
274+ )
275+ management = connection .management ()
276+ management .declare_queue (queue_specification )
277+
278+ addr_queue = AddressHelper .queue_address (stream_name )
279+
280+ # consume and then publish
281+ try :
282+ stream_filter_options = StreamOptions ()
283+ stream_filter_options .filter_values (["banana" ])
284+ connection_consumer = environment .connection (
285+ "amqp://guest:guest@localhost:5672/"
286+ )
287+ connection_consumer .dial ()
288+ consumer = connection_consumer .consumer (
289+ addr_queue ,
290+ # check we are reading just from offset 10 as just banana filtering applies
291+ message_handler = MyMessageHandlerAcceptStreamOffset (10 ),
292+ stream_filter_options = stream_filter_options ,
293+ )
294+ # send with annotations filter apple and then banana
295+ # consumer will read just from offset 10
296+ publish_messages (connection , messages_to_send , stream_name , ["apple" ])
297+ publish_messages (connection , messages_to_send , stream_name , ["banana" ])
298+ consumer .run ()
299+ # ack to terminate the consumer
300+ except ConsumerTestException :
301+ pass
302+
303+ consumer .close ()
304+
305+ management .delete_queue (stream_name )
306+
307+
264308def test_stream_filtering_not_present (
265309 connection : Connection , environment : Environment
266310) -> None :
0 commit comments