@@ -193,3 +193,76 @@ def test_stream_read_from_offset_ten(connection: Connection) -> None:
193193 consumer .close ()
194194
195195 management .delete_queue (stream_name )
196+
197+
198+ def test_stream_filtering (connection : Connection ) -> None :
199+
200+ consumer = None
201+ stream_name = "test_stream_info_with_filtering"
202+ messages_to_send = 10
203+
204+ queue_specification = StreamSpecification (
205+ name = stream_name ,
206+ )
207+ management = connection .management ()
208+ management .declare_queue (queue_specification )
209+
210+ addr_queue = AddressHelper .queue_address (stream_name )
211+
212+ # consume and then publish
213+ try :
214+ stream_filter_options = StreamFilterOptions ()
215+ stream_filter_options .apply_filters (["banana" ])
216+ connection_consumer = create_connection ()
217+ consumer = connection_consumer .consumer (
218+ addr_queue ,
219+ handler = MyMessageHandlerAcceptStreamOffset (),
220+ stream_filter_options = stream_filter_options ,
221+ )
222+ # send with annotations filter banana
223+ publish_messages (connection , messages_to_send , stream_name , ["banana" ])
224+ consumer .run ()
225+ # ack to terminate the consumer
226+ except ConsumerTestException :
227+ pass
228+
229+ consumer .close ()
230+
231+ management .delete_queue (stream_name )
232+
233+
234+ def test_stream_filtering_not_present (connection : Connection ) -> None :
235+
236+ raised = False
237+ stream_name = "test_stream_info_with_filtering"
238+ messages_to_send = 10
239+
240+ queue_specification = StreamSpecification (
241+ name = stream_name ,
242+ )
243+ management = connection .management ()
244+ management .declare_queue (queue_specification )
245+
246+ addr_queue = AddressHelper .queue_address (stream_name )
247+
248+ # consume and then publish
249+ stream_filter_options = StreamFilterOptions ()
250+ stream_filter_options .apply_filters (["apple" ])
251+ connection_consumer = create_connection ()
252+ consumer = connection_consumer .consumer (
253+ addr_queue , stream_filter_options = stream_filter_options
254+ )
255+ # send with annotations filter banana
256+ publish_messages (connection , messages_to_send , stream_name , ["banana" ])
257+
258+ try :
259+ consumer .consume (timeout = 1 )
260+ except Exception :
261+ # valid no message should arrive with filter banana so a timeout exception is raised
262+ raised = True
263+
264+ consumer .close ()
265+
266+ management .delete_queue (stream_name )
267+
268+ assert raised is True
0 commit comments