@@ -65,17 +65,16 @@ def test_stream_read_from_last(
6565
6666 addr_queue = AddressHelper .queue_address (stream_name )
6767
68- stream_filter_options = StreamOptions ()
69- stream_filter_options .offset (OffsetSpecification .last )
70-
7168 # consume and then publish
7269 try :
7370 connection_consumer = environment .connection ()
7471 connection_consumer .dial ()
7572 consumer = connection_consumer .consumer (
7673 addr_queue ,
7774 message_handler = MyMessageHandlerAcceptStreamOffset (),
78- stream_filter_options = stream_filter_options ,
75+ stream_filter_options = StreamOptions (
76+ offset_specification = OffsetSpecification .last
77+ ),
7978 )
8079 publish_messages (connection , messages_to_send , stream_name )
8180 consumer .run ()
@@ -107,16 +106,13 @@ def test_stream_read_from_offset_zero(
107106 # publish and then consume
108107 publish_messages (connection , messages_to_send , stream_name )
109108
110- stream_filter_options = StreamOptions ()
111- stream_filter_options .offset (0 )
112-
113109 try :
114110 connection_consumer = environment .connection ()
115111 connection_consumer .dial ()
116112 consumer = connection_consumer .consumer (
117113 addr_queue ,
118114 message_handler = MyMessageHandlerAcceptStreamOffset (0 ),
119- stream_filter_options = stream_filter_options ,
115+ stream_filter_options = StreamOptions ( offset_specification = 0 ) ,
120116 )
121117
122118 consumer .run ()
@@ -148,16 +144,13 @@ def test_stream_read_from_offset_first(
148144 # publish and then consume
149145 publish_messages (connection , messages_to_send , stream_name )
150146
151- stream_filter_options = StreamOptions ()
152- stream_filter_options .offset (OffsetSpecification .first )
153-
154147 try :
155148 connection_consumer = environment .connection ()
156149 connection_consumer .dial ()
157150 consumer = connection_consumer .consumer (
158151 addr_queue ,
159152 message_handler = MyMessageHandlerAcceptStreamOffset (0 ),
160- stream_filter_options = stream_filter_options ,
153+ stream_filter_options = StreamOptions ( OffsetSpecification . first ) ,
161154 )
162155
163156 consumer .run ()
@@ -189,16 +182,13 @@ def test_stream_read_from_offset_ten(
189182 # publish and then consume
190183 publish_messages (connection , messages_to_send , stream_name )
191184
192- stream_filter_options = StreamOptions ()
193- stream_filter_options .offset (10 )
194-
195185 try :
196186 connection_consumer = environment .connection ()
197187 connection_consumer .dial ()
198188 consumer = connection_consumer .consumer (
199189 addr_queue ,
200190 message_handler = MyMessageHandlerAcceptStreamOffset (10 ),
201- stream_filter_options = stream_filter_options ,
191+ stream_filter_options = StreamOptions ( offset_specification = 10 ) ,
202192 )
203193
204194 consumer .run ()
@@ -228,15 +218,13 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N
228218
229219 # consume and then publish
230220 try :
231- stream_filter_options = StreamOptions ()
232- stream_filter_options .filter_values (["banana" ])
233221 connection_consumer = environment .connection ()
234222 connection_consumer .dial ()
235223
236224 consumer = connection_consumer .consumer (
237225 addr_queue ,
238226 message_handler = MyMessageHandlerAcceptStreamOffset (),
239- stream_filter_options = stream_filter_options ,
227+ stream_filter_options = StreamOptions ( stream_filters = [ "banana" ]) ,
240228 )
241229 # send with annotations filter banana
242230 publish_messages (connection , messages_to_send , stream_name , ["banana" ])
@@ -268,15 +256,13 @@ def test_stream_filtering_mixed(
268256
269257 # consume and then publish
270258 try :
271- stream_filter_options = StreamOptions ()
272- stream_filter_options .filter_values (["banana" ])
273259 connection_consumer = environment .connection ()
274260 connection_consumer .dial ()
275261 consumer = connection_consumer .consumer (
276262 addr_queue ,
277263 # check we are reading just from offset 10 as just banana filtering applies
278264 message_handler = MyMessageHandlerAcceptStreamOffset (10 ),
279- stream_filter_options = stream_filter_options ,
265+ stream_filter_options = StreamOptions ( stream_filters = [ "banana" ]) ,
280266 )
281267 # send with annotations filter apple and then banana
282268 # consumer will read just from offset 10
@@ -309,13 +295,11 @@ def test_stream_filtering_not_present(
309295 addr_queue = AddressHelper .queue_address (stream_name )
310296
311297 # consume and then publish
312- stream_filter_options = StreamOptions ()
313- stream_filter_options .filter_values (["apple" ])
314298 connection_consumer = environment .connection ()
315299 connection_consumer .dial ()
316300
317301 consumer = connection_consumer .consumer (
318- addr_queue , stream_filter_options = stream_filter_options
302+ addr_queue , stream_filter_options = StreamOptions ( stream_filters = [ "apple" ])
319303 )
320304 # send with annotations filter banana
321305 publish_messages (connection , messages_to_send , stream_name , ["banana" ])
@@ -351,15 +335,14 @@ def test_stream_match_unfiltered(
351335
352336 # consume and then publish
353337 try :
354- stream_filter_options = StreamOptions ()
355- stream_filter_options .filter_values (["banana" ])
356- stream_filter_options .filter_match_unfiltered (True )
357338 connection_consumer = environment .connection ()
358339 connection_consumer .dial ()
359340 consumer = connection_consumer .consumer (
360341 addr_queue ,
361342 message_handler = MyMessageHandlerAcceptStreamOffset (),
362- stream_filter_options = stream_filter_options ,
343+ stream_filter_options = StreamOptions (
344+ stream_filters = ["banana" ], filter_match_unfiltered = True
345+ ),
363346 )
364347 # send with annotations filter banana
365348 publish_messages (connection , messages_to_send , stream_name )
@@ -391,16 +374,15 @@ def test_stream_reconnection(
391374
392375 # consume and then publish
393376 try :
394- stream_filter_options = StreamOptions ()
395- stream_filter_options .filter_values (["banana" ])
396- stream_filter_options .filter_match_unfiltered (True )
397377 connection_consumer = environment .connection ()
398378 connection_consumer .dial ()
399379 consumer = connection_consumer .consumer (
400380 addr_queue ,
401381 # disconnection and check happens here
402382 message_handler = MyMessageHandlerAcceptStreamOffsetReconnect (),
403- stream_filter_options = stream_filter_options ,
383+ stream_filter_options = StreamOptions (
384+ stream_filters = ["banana" ], filter_match_unfiltered = True
385+ ),
404386 )
405387 # send with annotations filter banana
406388 publish_messages (connection_with_reconnect , messages_to_send , stream_name )
0 commit comments