diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index 57cc7c0..cb53e73 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -13,7 +13,7 @@ StreamSpecification, ) -MESSAGES_TO_PUBLISH = 1 +MESSAGES_TO_PUBLISH = 100 class MyMessageHandler(AMQPMessagingHandler): @@ -87,7 +87,7 @@ def main() -> None: queue_name = "example-queue" print("connection to amqp server") - environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True) + environment = Environment("amqp://guest:guest@localhost:5672/") connection = create_connection(environment) management = connection.management() @@ -98,16 +98,14 @@ def main() -> None: consumer_connection = create_connection(environment) - stream_filter_options = StreamOptions() - # can be first, last, next or an offset long - # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered - stream_filter_options.offset(OffsetSpecification.first) - stream_filter_options.filter_values(["banana"]) - consumer = consumer_connection.consumer( addr_queue, message_handler=MyMessageHandler(), - stream_filter_options=stream_filter_options, + # can be first, last, next or an offset long + # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered + stream_filter_options=StreamOptions( + offset_specification=OffsetSpecification.first, filters=["banana"] + ), ) print( "create a consumer and consume the test message - press control + c to terminate to consume" diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 9878a6a..0577375 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -4,6 +4,7 @@ from typing import Any, Dict, Optional, Union from .common import ExchangeType, QueueType +from .exceptions import ValidationCodeException from .qpid.proton._data import Described, symbol STREAM_FILTER_SPEC = "rabbitmq:stream-filter" @@ -156,12 +157,35 @@ class StreamOptions: Attributes: _filter_set: Dictionary of stream filter specifications + + Args: + offset_specification: Either an OffsetSpecification enum value or + an integer offset + filters: List of filter strings to apply to the stream """ - def __init__(self): # type: ignore + def __init__( + self, + offset_specification: Optional[Union[OffsetSpecification, int]] = None, + filters: Optional[list[str]] = None, + filter_match_unfiltered: bool = False, + ): + + if offset_specification is None and filters is None: + raise ValidationCodeException( + "At least one between offset_specification and filters must be set when setting up filtering" + ) self._filter_set: Dict[symbol, Described] = {} + if offset_specification is not None: + self._offset(offset_specification) + + if filters is not None: + self._filter_values(filters) + + if filter_match_unfiltered is True: + self._filter_match_unfiltered(filter_match_unfiltered) - def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: + def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ Set the offset specification for the stream. @@ -178,7 +202,7 @@ def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: symbol(STREAM_OFFSET_SPEC), offset_specification.name ) - def filter_values(self, filters: list[str]) -> None: + def _filter_values(self, filters: list[str]) -> None: """ Set the filter values for the stream. @@ -189,7 +213,7 @@ def filter_values(self, filters: list[str]) -> None: symbol(STREAM_FILTER_SPEC), filters ) - def filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None: + def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None: """ Set whether to match unfiltered messages. diff --git a/tests/test_streams.py b/tests/test_streams.py index 890ab94..280805d 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -65,9 +65,6 @@ def test_stream_read_from_last( addr_queue = AddressHelper.queue_address(stream_name) - stream_filter_options = StreamOptions() - stream_filter_options.offset(OffsetSpecification.last) - # consume and then publish try: connection_consumer = environment.connection() @@ -75,7 +72,9 @@ def test_stream_read_from_last( consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions( + offset_specification=OffsetSpecification.last + ), ) publish_messages(connection, messages_to_send, stream_name) consumer.run() @@ -107,16 +106,13 @@ def test_stream_read_from_offset_zero( # publish and then consume publish_messages(connection, messages_to_send, stream_name) - stream_filter_options = StreamOptions() - stream_filter_options.offset(0) - try: connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(offset_specification=0), ) consumer.run() @@ -148,16 +144,13 @@ def test_stream_read_from_offset_first( # publish and then consume publish_messages(connection, messages_to_send, stream_name) - stream_filter_options = StreamOptions() - stream_filter_options.offset(OffsetSpecification.first) - try: connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(0), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(OffsetSpecification.first), ) consumer.run() @@ -189,16 +182,13 @@ def test_stream_read_from_offset_ten( # publish and then consume publish_messages(connection, messages_to_send, stream_name) - stream_filter_options = StreamOptions() - stream_filter_options.offset(10) - try: connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(10), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(offset_specification=10), ) consumer.run() @@ -228,15 +218,13 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N # consume and then publish try: - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["banana"]) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(filters=["banana"]), ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name, ["banana"]) @@ -268,15 +256,13 @@ def test_stream_filtering_mixed( # consume and then publish try: - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["banana"]) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, # check we are reading just from offset 10 as just banana filtering applies message_handler=MyMessageHandlerAcceptStreamOffset(10), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions(filters=["banana"]), ) # send with annotations filter apple and then banana # consumer will read just from offset 10 @@ -309,13 +295,11 @@ def test_stream_filtering_not_present( addr_queue = AddressHelper.queue_address(stream_name) # consume and then publish - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["apple"]) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( - addr_queue, stream_filter_options=stream_filter_options + addr_queue, stream_filter_options=StreamOptions(filters=["apple"]) ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name, ["banana"]) @@ -351,15 +335,14 @@ def test_stream_match_unfiltered( # consume and then publish try: - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["banana"]) - stream_filter_options.filter_match_unfiltered(True) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset(), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions( + filters=["banana"], filter_match_unfiltered=True + ), ) # send with annotations filter banana publish_messages(connection, messages_to_send, stream_name) @@ -391,16 +374,15 @@ def test_stream_reconnection( # consume and then publish try: - stream_filter_options = StreamOptions() - stream_filter_options.filter_values(["banana"]) - stream_filter_options.filter_match_unfiltered(True) connection_consumer = environment.connection() connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, # disconnection and check happens here message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(), - stream_filter_options=stream_filter_options, + stream_filter_options=StreamOptions( + filters=["banana"], filter_match_unfiltered=True + ), ) # send with annotations filter banana publish_messages(connection_with_reconnect, messages_to_send, stream_name)