Skip to content

Commit 36bb249

Browse files
authored
refactor consumer stream options (#75)
* refactor the stream consumer options and the stream filter options. This implementation is similar to the golang implementation. So the idea is to have the same API ( as much as possible ) for all the amqp 1.0. clients. Add a new class StreamFilterOptions to configure only the filtering and leave the StreamConsumerOptions for offset and other future implementations --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 43159ae commit 36bb249

File tree

8 files changed

+96
-57
lines changed

8 files changed

+96
-57
lines changed

examples/streams/example_with_streams.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
Event,
1111
Message,
1212
OffsetSpecification,
13-
StreamOptions,
13+
StreamConsumerOptions,
1414
StreamSpecification,
1515
)
1616

@@ -104,8 +104,8 @@ def main() -> None:
104104
message_handler=MyMessageHandler(),
105105
# can be first, last, next or an offset long
106106
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
107-
stream_filter_options=StreamOptions(
108-
offset_specification=OffsetSpecification.first, filters=["banana"]
107+
stream_consumer_options=StreamConsumerOptions(
108+
offset_specification=OffsetSpecification.first
109109
),
110110
)
111111
print(

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
OAuth2Options,
1414
OffsetSpecification,
1515
RecoveryConfiguration,
16-
StreamOptions,
16+
StreamConsumerOptions,
1717
)
1818
from .environment import Environment
1919
from .exceptions import (
@@ -85,7 +85,7 @@
8585
"CurrentUserStore",
8686
"PKCS12Store",
8787
"ConnectionClosed",
88-
"StreamOptions",
88+
"StreamConsumerOptions",
8989
"OffsetSpecification",
9090
"OutcomeState",
9191
"Environment",

rabbitmq_amqp_python_client/connection.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from .entities import (
1919
OAuth2Options,
2020
RecoveryConfiguration,
21-
StreamOptions,
21+
StreamConsumerOptions,
2222
)
2323
from .exceptions import (
2424
ArgumentOutOfRangeException,
@@ -363,7 +363,7 @@ def publisher(self, destination: str = "") -> Publisher:
363363
ArgumentOutOfRangeException: If destination address format is invalid
364364
"""
365365
if destination != "":
366-
if validate_address(destination) is False:
366+
if not validate_address(destination):
367367
raise ArgumentOutOfRangeException(
368368
"destination address must start with /queues or /exchanges"
369369
)
@@ -376,7 +376,7 @@ def consumer(
376376
self,
377377
destination: str,
378378
message_handler: Optional[MessagingHandler] = None,
379-
stream_filter_options: Optional[StreamOptions] = None,
379+
stream_consumer_options: Optional[StreamConsumerOptions] = None,
380380
credit: Optional[int] = None,
381381
) -> Consumer:
382382
"""
@@ -385,7 +385,7 @@ def consumer(
385385
Args:
386386
destination: The address to consume from
387387
message_handler: Optional handler for processing messages
388-
stream_filter_options: Optional configuration for stream consumption
388+
stream_consumer_options: Optional configuration for stream consumption
389389
credit: Optional credit value for flow control
390390
391391
Returns:
@@ -394,12 +394,12 @@ def consumer(
394394
Raises:
395395
ArgumentOutOfRangeException: If destination address format is invalid
396396
"""
397-
if validate_address(destination) is False:
397+
if not validate_address(destination):
398398
raise ArgumentOutOfRangeException(
399399
"destination address must start with /queues or /exchanges"
400400
)
401401
consumer = Consumer(
402-
self._conn, destination, message_handler, stream_filter_options, credit
402+
self._conn, destination, message_handler, stream_consumer_options, credit
403403
)
404404
self._consumers.append(consumer)
405405
return consumer

rabbitmq_amqp_python_client/consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Literal, Optional, Union, cast
33

44
from .amqp_consumer_handler import AMQPMessagingHandler
5-
from .entities import StreamOptions
5+
from .entities import StreamConsumerOptions
66
from .options import (
77
ReceiverOptionUnsettled,
88
ReceiverOptionUnsettledWithFilters,
@@ -29,7 +29,7 @@ class Consumer:
2929
_conn (BlockingConnection): The underlying connection to RabbitMQ
3030
_addr (str): The address to consume from
3131
_handler (Optional[MessagingHandler]): Optional message handling callback
32-
_stream_options (Optional[StreamOptions]): Configuration for stream consumption
32+
_stream_options (Optional[StreamConsumerOptions]): Configuration for stream consumption
3333
_credit (Optional[int]): Flow control credit value
3434
"""
3535

@@ -38,7 +38,7 @@ def __init__(
3838
conn: BlockingConnection,
3939
addr: str,
4040
handler: Optional[AMQPMessagingHandler] = None,
41-
stream_options: Optional[StreamOptions] = None,
41+
stream_options: Optional[StreamConsumerOptions] = None,
4242
credit: Optional[int] = None,
4343
):
4444
"""

rabbitmq_amqp_python_client/entities.py

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,32 @@ class ExchangeToExchangeBindingSpecification:
149149
binding_key: Optional[str] = None
150150

151151

152-
class StreamOptions:
152+
"""
153+
StreamFilterOptions defines the filtering options for a stream consumer.
154+
for values and match_unfiltered see: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
155+
"""
156+
157+
158+
class StreamFilterOptions:
159+
values: Optional[list[str]] = None
160+
match_unfiltered: bool = False
161+
application_properties: Optional[dict[str, Any]] = None
162+
sql: str = ""
163+
164+
def __init__(
165+
self,
166+
values: Optional[list[str]] = None,
167+
match_unfiltered: bool = False,
168+
application_properties: Optional[dict[str, Any]] = None,
169+
sql: str = "",
170+
):
171+
self.values = values
172+
self.match_unfiltered = match_unfiltered
173+
self.application_properties = application_properties
174+
self.sql = sql
175+
176+
177+
class StreamConsumerOptions:
153178
"""
154179
Configuration options for stream queues.
155180
@@ -161,29 +186,36 @@ class StreamOptions:
161186
Args:
162187
offset_specification: Either an OffsetSpecification enum value or
163188
an integer offset
164-
filters: List of filter strings to apply to the stream
189+
filter_options: Filter options for the stream consumer. See StreamFilterOptions
165190
"""
166191

167192
def __init__(
168193
self,
169194
offset_specification: Optional[Union[OffsetSpecification, int]] = None,
170-
filters: Optional[list[str]] = None,
171-
filter_match_unfiltered: bool = False,
195+
filter_options: Optional[StreamFilterOptions] = None,
172196
):
173197

174-
if offset_specification is None and filters is None:
198+
self.streamFilterOptions = filter_options
199+
200+
if offset_specification is None and self.streamFilterOptions is None:
175201
raise ValidationCodeException(
176202
"At least one between offset_specification and filters must be set when setting up filtering"
177203
)
178204
self._filter_set: Dict[symbol, Described] = {}
179205
if offset_specification is not None:
180206
self._offset(offset_specification)
181207

182-
if filters is not None:
183-
self._filter_values(filters)
184-
185-
if filter_match_unfiltered is True:
186-
self._filter_match_unfiltered(filter_match_unfiltered)
208+
if (
209+
self.streamFilterOptions is not None
210+
and self.streamFilterOptions.values is not None
211+
):
212+
self._filter_values(self.streamFilterOptions.values)
213+
214+
if (
215+
self.streamFilterOptions is not None
216+
and self.streamFilterOptions.match_unfiltered
217+
):
218+
self._filter_match_unfiltered(self.streamFilterOptions.match_unfiltered)
187219

188220
def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
189221
"""

rabbitmq_amqp_python_client/options.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .entities import StreamOptions
1+
from .entities import StreamConsumerOptions
22
from .qpid.proton._data import ( # noqa: E402
33
PropertyDict,
44
symbol,
@@ -68,7 +68,7 @@ def test(self, link: Link) -> bool:
6868

6969

7070
class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore
71-
def __init__(self, addr: str, filter_options: StreamOptions):
71+
def __init__(self, addr: str, filter_options: StreamConsumerOptions):
7272
super().__init__(filter_options.filter_set())
7373
self._addr = addr
7474

rabbitmq_amqp_python_client/qpid/proton/_message.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def __init__(
117117
self._msg = pn_message()
118118
self.instructions = None
119119
self.annotations = None
120-
self.properties = None
120+
self.application_properties = None
121121
self.body = body
122122
self.inferred = inferred
123123

@@ -149,7 +149,7 @@ def _check_property_keys(self) -> None:
149149
# We cannot make changes to the dict while iterating, so we
150150
# must save and make the changes afterwards
151151
changed_keys = []
152-
for k in self.properties.keys():
152+
for k in self.application_properties.keys():
153153
if isinstance(k, str):
154154
# strings and their subclasses
155155
if type(k) is symbol or type(k) is char:
@@ -169,9 +169,12 @@ def _check_property_keys(self) -> None:
169169
)
170170
# Make the key changes
171171
for old_key, new_key in changed_keys:
172-
self.properties[new_key] = self.properties.pop(old_key)
172+
self.application_properties[new_key] = self.application_properties.pop(
173+
old_key
174+
)
173175

174176
def _pre_encode(self) -> None:
177+
175178
inst = Data(pn_message_instructions(self._msg))
176179
ann = Data(pn_message_annotations(self._msg))
177180
props = Data(pn_message_properties(self._msg))
@@ -184,9 +187,9 @@ def _pre_encode(self) -> None:
184187
if self.annotations is not None:
185188
ann.put_object(self.annotations)
186189
props.clear()
187-
if self.properties is not None:
190+
if self.application_properties is not None:
188191
self._check_property_keys()
189-
props.put_object(self.properties)
192+
props.put_object(self.application_properties)
190193
body.clear()
191194
if self.body is not None:
192195
body.put_object(self.body)
@@ -206,9 +209,9 @@ def _post_decode(self) -> None:
206209
else:
207210
self.annotations = None
208211
if props.next():
209-
self.properties = props.get_object()
212+
self.application_properties = props.get_object()
210213
else:
211-
self.properties = None
214+
self.application_properties = None
212215
if body.next():
213216
self.body = body.get_object()
214217
else:
@@ -222,7 +225,7 @@ def clear(self) -> None:
222225
pn_message_clear(self._msg)
223226
self.instructions = None
224227
self.annotations = None
225-
self.properties = None
228+
self.application_properties = None
226229
self.body = None
227230

228231
@property
@@ -641,7 +644,7 @@ def __repr__(self) -> str:
641644
"reply_to_group_id",
642645
"instructions",
643646
"annotations",
644-
"properties",
647+
"application_properties",
645648
"body",
646649
):
647650
value = getattr(self, attr)

0 commit comments

Comments
 (0)