Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions examples/streams/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
StreamSpecification,
)

MESSAGES_TO_PUBLISH = 1
MESSAGES_TO_PUBLISH = 100


class MyMessageHandler(AMQPMessagingHandler):
Expand Down Expand Up @@ -87,7 +87,7 @@ def main() -> None:
queue_name = "example-queue"

print("connection to amqp server")
environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True)
environment = Environment("amqp://guest:guest@localhost:5672/")
connection = create_connection(environment)

management = connection.management()
Expand All @@ -98,16 +98,14 @@ def main() -> None:

consumer_connection = create_connection(environment)

stream_filter_options = StreamOptions()
# can be first, last, next or an offset long
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
stream_filter_options.offset(OffsetSpecification.first)
stream_filter_options.filter_values(["banana"])

consumer = consumer_connection.consumer(
addr_queue,
message_handler=MyMessageHandler(),
stream_filter_options=stream_filter_options,
# can be first, last, next or an offset long
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
stream_filter_options=StreamOptions(
offset_specification=OffsetSpecification.first, filters=["banana"]
),
)
print(
"create a consumer and consume the test message - press control + c to terminate to consume"
Expand Down
32 changes: 28 additions & 4 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Dict, Optional, Union

from .common import ExchangeType, QueueType
from .exceptions import ValidationCodeException
from .qpid.proton._data import Described, symbol

STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
Expand Down Expand Up @@ -156,12 +157,35 @@ class StreamOptions:

Attributes:
_filter_set: Dictionary of stream filter specifications

Args:
offset_specification: Either an OffsetSpecification enum value or
an integer offset
filters: List of filter strings to apply to the stream
"""

def __init__(self): # type: ignore
def __init__(
self,
offset_specification: Optional[Union[OffsetSpecification, int]] = None,
filters: Optional[list[str]] = None,
filter_match_unfiltered: bool = False,
):

if offset_specification is None and filters is None:
raise ValidationCodeException(
"At least one between offset_specification and filters must be set when setting up filtering"
)
self._filter_set: Dict[symbol, Described] = {}
if offset_specification is not None:
self._offset(offset_specification)

if filters is not None:
self._filter_values(filters)

if filter_match_unfiltered is True:
self._filter_match_unfiltered(filter_match_unfiltered)

def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
"""
Set the offset specification for the stream.

Expand All @@ -178,7 +202,7 @@ def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
symbol(STREAM_OFFSET_SPEC), offset_specification.name
)

def filter_values(self, filters: list[str]) -> None:
def _filter_values(self, filters: list[str]) -> None:
"""
Set the filter values for the stream.

Expand All @@ -189,7 +213,7 @@ def filter_values(self, filters: list[str]) -> None:
symbol(STREAM_FILTER_SPEC), filters
)

def filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None:
def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None:
"""
Set whether to match unfiltered messages.

Expand Down
48 changes: 15 additions & 33 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,16 @@ def test_stream_read_from_last(

addr_queue = AddressHelper.queue_address(stream_name)

stream_filter_options = StreamOptions()
stream_filter_options.offset(OffsetSpecification.last)

# consume and then publish
try:
connection_consumer = environment.connection()
connection_consumer.dial()
consumer = connection_consumer.consumer(
addr_queue,
message_handler=MyMessageHandlerAcceptStreamOffset(),
stream_filter_options=stream_filter_options,
stream_filter_options=StreamOptions(
offset_specification=OffsetSpecification.last
),
)
publish_messages(connection, messages_to_send, stream_name)
consumer.run()
Expand Down Expand Up @@ -107,16 +106,13 @@ def test_stream_read_from_offset_zero(
# publish and then consume
publish_messages(connection, messages_to_send, stream_name)

stream_filter_options = StreamOptions()
stream_filter_options.offset(0)

try:
connection_consumer = environment.connection()
connection_consumer.dial()
consumer = connection_consumer.consumer(
addr_queue,
message_handler=MyMessageHandlerAcceptStreamOffset(0),
stream_filter_options=stream_filter_options,
stream_filter_options=StreamOptions(offset_specification=0),
)

consumer.run()
Expand Down Expand Up @@ -148,16 +144,13 @@ def test_stream_read_from_offset_first(
# publish and then consume
publish_messages(connection, messages_to_send, stream_name)

stream_filter_options = StreamOptions()
stream_filter_options.offset(OffsetSpecification.first)

try:
connection_consumer = environment.connection()
connection_consumer.dial()
consumer = connection_consumer.consumer(
addr_queue,
message_handler=MyMessageHandlerAcceptStreamOffset(0),
stream_filter_options=stream_filter_options,
stream_filter_options=StreamOptions(OffsetSpecification.first),
)

consumer.run()
Expand Down Expand Up @@ -189,16 +182,13 @@ def test_stream_read_from_offset_ten(
# publish and then consume
publish_messages(connection, messages_to_send, stream_name)

stream_filter_options = StreamOptions()
stream_filter_options.offset(10)

try:
connection_consumer = environment.connection()
connection_consumer.dial()
consumer = connection_consumer.consumer(
addr_queue,
message_handler=MyMessageHandlerAcceptStreamOffset(10),
stream_filter_options=stream_filter_options,
stream_filter_options=StreamOptions(offset_specification=10),
)

consumer.run()
Expand Down Expand Up @@ -228,15 +218,13 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N

# consume and then publish
try:
stream_filter_options = StreamOptions()
stream_filter_options.filter_values(["banana"])
connection_consumer = environment.connection()
connection_consumer.dial()

consumer = connection_consumer.consumer(
addr_queue,
message_handler=MyMessageHandlerAcceptStreamOffset(),
stream_filter_options=stream_filter_options,
stream_filter_options=StreamOptions(filters=["banana"]),
)
# send with annotations filter banana
publish_messages(connection, messages_to_send, stream_name, ["banana"])
Expand Down Expand Up @@ -268,15 +256,13 @@ def test_stream_filtering_mixed(

# consume and then publish
try:
stream_filter_options = StreamOptions()
stream_filter_options.filter_values(["banana"])
connection_consumer = environment.connection()
connection_consumer.dial()
consumer = connection_consumer.consumer(
addr_queue,
# check we are reading just from offset 10 as just banana filtering applies
message_handler=MyMessageHandlerAcceptStreamOffset(10),
stream_filter_options=stream_filter_options,
stream_filter_options=StreamOptions(filters=["banana"]),
)
# send with annotations filter apple and then banana
# consumer will read just from offset 10
Expand Down Expand Up @@ -309,13 +295,11 @@ def test_stream_filtering_not_present(
addr_queue = AddressHelper.queue_address(stream_name)

# consume and then publish
stream_filter_options = StreamOptions()
stream_filter_options.filter_values(["apple"])
connection_consumer = environment.connection()
connection_consumer.dial()

consumer = connection_consumer.consumer(
addr_queue, stream_filter_options=stream_filter_options
addr_queue, stream_filter_options=StreamOptions(filters=["apple"])
)
# send with annotations filter banana
publish_messages(connection, messages_to_send, stream_name, ["banana"])
Expand Down Expand Up @@ -351,15 +335,14 @@ def test_stream_match_unfiltered(

# consume and then publish
try:
stream_filter_options = StreamOptions()
stream_filter_options.filter_values(["banana"])
stream_filter_options.filter_match_unfiltered(True)
connection_consumer = environment.connection()
connection_consumer.dial()
consumer = connection_consumer.consumer(
addr_queue,
message_handler=MyMessageHandlerAcceptStreamOffset(),
stream_filter_options=stream_filter_options,
stream_filter_options=StreamOptions(
filters=["banana"], filter_match_unfiltered=True
),
)
# send with annotations filter banana
publish_messages(connection, messages_to_send, stream_name)
Expand Down Expand Up @@ -391,16 +374,15 @@ def test_stream_reconnection(

# consume and then publish
try:
stream_filter_options = StreamOptions()
stream_filter_options.filter_values(["banana"])
stream_filter_options.filter_match_unfiltered(True)
connection_consumer = environment.connection()
connection_consumer.dial()
consumer = connection_consumer.consumer(
addr_queue,
# disconnection and check happens here
message_handler=MyMessageHandlerAcceptStreamOffsetReconnect(),
stream_filter_options=stream_filter_options,
stream_filter_options=StreamOptions(
filters=["banana"], filter_match_unfiltered=True
),
)
# send with annotations filter banana
publish_messages(connection_with_reconnect, messages_to_send, stream_name)
Expand Down