File tree Expand file tree Collapse file tree 3 files changed +45
-2
lines changed Expand file tree Collapse file tree 3 files changed +45
-2
lines changed File renamed without changes.
Original file line number Diff line number Diff line change @@ -19,7 +19,12 @@ def __init__(self):
1919 self ._count = 0
2020
2121 def on_message (self , event : Event ):
22- print ("received message: " + str (event .message .body ))
22+ print (
23+ "received message from stream: "
24+ + str (event .message .body )
25+ + " with offset: "
26+ + str (event .message .annotations ["x-stream-offset" ])
27+ )
2328
2429 # accepting
2530 self .delivery_context .accept (event )
@@ -92,6 +97,7 @@ def main() -> None:
9297
9398 stream_filter_options = StreamFilterOptions ()
9499 # can be first, last, next or an offset long
100+ # you can also specify stream filters
95101 stream_filter_options .offset (OffsetSpecification .first )
96102
97103 consumer = consumer_connection .consumer (
@@ -118,7 +124,7 @@ def main() -> None:
118124
119125 #
120126 print ("delete queue" )
121- # management.delete_queue(queue_name)
127+ management .delete_queue (queue_name )
122128
123129 print ("closing connections" )
124130 management .close ()
Original file line number Diff line number Diff line change @@ -266,3 +266,40 @@ def test_stream_filtering_not_present(connection: Connection) -> None:
266266 management .delete_queue (stream_name )
267267
268268 assert raised is True
269+
270+
271+ def test_stream_match_unfiltered (connection : Connection ) -> None :
272+
273+ consumer = None
274+ stream_name = "test_stream_info_with_filtering"
275+ messages_to_send = 10
276+
277+ queue_specification = StreamSpecification (
278+ name = stream_name ,
279+ )
280+ management = connection .management ()
281+ management .declare_queue (queue_specification )
282+
283+ addr_queue = AddressHelper .queue_address (stream_name )
284+
285+ # consume and then publish
286+ try :
287+ stream_filter_options = StreamFilterOptions ()
288+ stream_filter_options .apply_filters (["banana" ])
289+ stream_filter_options .filter_match_unfiltered (True )
290+ connection_consumer = create_connection ()
291+ consumer = connection_consumer .consumer (
292+ addr_queue ,
293+ handler = MyMessageHandlerAcceptStreamOffset (),
294+ stream_filter_options = stream_filter_options ,
295+ )
296+ # send with annotations filter banana
297+ publish_messages (connection , messages_to_send , stream_name )
298+ consumer .run ()
299+ # ack to terminate the consumer
300+ except ConsumerTestException :
301+ pass
302+
303+ consumer .close ()
304+
305+ management .delete_queue (stream_name )
You can’t perform that action at this time.
0 commit comments