Skip to content

Commit 853e20e

Browse files
committed
refactor
the stream consumer options and the stream filter options. This implementation is similar to the golang implementation. So the idea is to have the same API ( as much as possible ) for all the amqp 1.0. clients. Add a new class StreamFilterOptions to configure only the filtering and leave the StreamConsumerOptions for offset and other future implementations Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 43159ae commit 853e20e

File tree

8 files changed

+88
-57
lines changed

8 files changed

+88
-57
lines changed

examples/streams/example_with_streams.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
Event,
1111
Message,
1212
OffsetSpecification,
13-
StreamOptions,
13+
StreamConsumerOptions,
1414
StreamSpecification,
1515
)
1616

@@ -104,9 +104,7 @@ def main() -> None:
104104
message_handler=MyMessageHandler(),
105105
# can be first, last, next or an offset long
106106
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
107-
stream_filter_options=StreamOptions(
108-
offset_specification=OffsetSpecification.first, filters=["banana"]
109-
),
107+
stream_consumer_options=StreamConsumerOptions(offset_specification=OffsetSpecification.first),
110108
)
111109
print(
112110
"create a consumer and consume the test message - press control + c to terminate to consume"

rabbitmq_amqp_python_client/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
OAuth2Options,
1414
OffsetSpecification,
1515
RecoveryConfiguration,
16-
StreamOptions,
16+
StreamConsumerOptions,
1717
)
1818
from .environment import Environment
1919
from .exceptions import (
@@ -85,7 +85,7 @@
8585
"CurrentUserStore",
8686
"PKCS12Store",
8787
"ConnectionClosed",
88-
"StreamOptions",
88+
"StreamConsumerOptions",
8989
"OffsetSpecification",
9090
"OutcomeState",
9191
"Environment",

rabbitmq_amqp_python_client/connection.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from .entities import (
1919
OAuth2Options,
2020
RecoveryConfiguration,
21-
StreamOptions,
21+
StreamConsumerOptions,
2222
)
2323
from .exceptions import (
2424
ArgumentOutOfRangeException,
@@ -363,7 +363,7 @@ def publisher(self, destination: str = "") -> Publisher:
363363
ArgumentOutOfRangeException: If destination address format is invalid
364364
"""
365365
if destination != "":
366-
if validate_address(destination) is False:
366+
if not validate_address(destination):
367367
raise ArgumentOutOfRangeException(
368368
"destination address must start with /queues or /exchanges"
369369
)
@@ -376,7 +376,7 @@ def consumer(
376376
self,
377377
destination: str,
378378
message_handler: Optional[MessagingHandler] = None,
379-
stream_filter_options: Optional[StreamOptions] = None,
379+
stream_consumer_options: Optional[StreamConsumerOptions] = None,
380380
credit: Optional[int] = None,
381381
) -> Consumer:
382382
"""
@@ -385,7 +385,7 @@ def consumer(
385385
Args:
386386
destination: The address to consume from
387387
message_handler: Optional handler for processing messages
388-
stream_filter_options: Optional configuration for stream consumption
388+
stream_consumer_options: Optional configuration for stream consumption
389389
credit: Optional credit value for flow control
390390
391391
Returns:
@@ -394,12 +394,12 @@ def consumer(
394394
Raises:
395395
ArgumentOutOfRangeException: If destination address format is invalid
396396
"""
397-
if validate_address(destination) is False:
397+
if not validate_address(destination):
398398
raise ArgumentOutOfRangeException(
399399
"destination address must start with /queues or /exchanges"
400400
)
401401
consumer = Consumer(
402-
self._conn, destination, message_handler, stream_filter_options, credit
402+
self._conn, destination, message_handler, stream_consumer_options, credit
403403
)
404404
self._consumers.append(consumer)
405405
return consumer

rabbitmq_amqp_python_client/consumer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Literal, Optional, Union, cast
33

44
from .amqp_consumer_handler import AMQPMessagingHandler
5-
from .entities import StreamOptions
5+
from .entities import StreamConsumerOptions
66
from .options import (
77
ReceiverOptionUnsettled,
88
ReceiverOptionUnsettledWithFilters,
@@ -29,7 +29,7 @@ class Consumer:
2929
_conn (BlockingConnection): The underlying connection to RabbitMQ
3030
_addr (str): The address to consume from
3131
_handler (Optional[MessagingHandler]): Optional message handling callback
32-
_stream_options (Optional[StreamOptions]): Configuration for stream consumption
32+
_stream_options (Optional[StreamConsumerOptions]): Configuration for stream consumption
3333
_credit (Optional[int]): Flow control credit value
3434
"""
3535

@@ -38,7 +38,7 @@ def __init__(
3838
conn: BlockingConnection,
3939
addr: str,
4040
handler: Optional[AMQPMessagingHandler] = None,
41-
stream_options: Optional[StreamOptions] = None,
41+
stream_options: Optional[StreamConsumerOptions] = None,
4242
credit: Optional[int] = None,
4343
):
4444
"""

rabbitmq_amqp_python_client/entities.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,26 @@ class ExchangeToExchangeBindingSpecification:
149149
binding_key: Optional[str] = None
150150

151151

152-
class StreamOptions:
152+
class StreamFilterOptions:
153+
values: Optional[list[str]] = (None,)
154+
match_unfiltered: bool = (False,)
155+
application_properties: dict = (field(default_factory=dict),)
156+
sql: str = ""
157+
158+
def __init__(
159+
self,
160+
values: Optional[list[str]] = None,
161+
match_unfiltered: bool = False,
162+
application_properties: Optional[dict] = None,
163+
sql: str = "",
164+
):
165+
self.values = values
166+
self.match_unfiltered = match_unfiltered
167+
self.application_properties = application_properties
168+
self.sql = sql
169+
170+
171+
class StreamConsumerOptions:
153172
"""
154173
Configuration options for stream queues.
155174
@@ -167,23 +186,30 @@ class StreamOptions:
167186
def __init__(
168187
self,
169188
offset_specification: Optional[Union[OffsetSpecification, int]] = None,
170-
filters: Optional[list[str]] = None,
171-
filter_match_unfiltered: bool = False,
189+
filter_options: Optional[StreamFilterOptions] = None,
172190
):
173191

174-
if offset_specification is None and filters is None:
192+
self.streamFilterOptions = filter_options
193+
194+
if offset_specification is None and self.streamFilterOptions is None:
175195
raise ValidationCodeException(
176196
"At least one between offset_specification and filters must be set when setting up filtering"
177197
)
178198
self._filter_set: Dict[symbol, Described] = {}
179199
if offset_specification is not None:
180200
self._offset(offset_specification)
181201

182-
if filters is not None:
183-
self._filter_values(filters)
184-
185-
if filter_match_unfiltered is True:
186-
self._filter_match_unfiltered(filter_match_unfiltered)
202+
if (
203+
self.streamFilterOptions is not None
204+
and self.streamFilterOptions.values is not None
205+
):
206+
self._filter_values(self.streamFilterOptions.values)
207+
208+
if (
209+
self.streamFilterOptions is not None
210+
and self.streamFilterOptions.match_unfiltered
211+
):
212+
self._filter_match_unfiltered(self.streamFilterOptions.match_unfiltered)
187213

188214
def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
189215
"""

rabbitmq_amqp_python_client/options.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .entities import StreamOptions
1+
from .entities import StreamConsumerOptions
22
from .qpid.proton._data import ( # noqa: E402
33
PropertyDict,
44
symbol,
@@ -68,7 +68,7 @@ def test(self, link: Link) -> bool:
6868

6969

7070
class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore
71-
def __init__(self, addr: str, filter_options: StreamOptions):
71+
def __init__(self, addr: str, filter_options: StreamConsumerOptions):
7272
super().__init__(filter_options.filter_set())
7373
self._addr = addr
7474

rabbitmq_amqp_python_client/qpid/proton/_message.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def __init__(
117117
self._msg = pn_message()
118118
self.instructions = None
119119
self.annotations = None
120-
self.properties = None
120+
self.application_properties = None
121121
self.body = body
122122
self.inferred = inferred
123123

@@ -149,7 +149,7 @@ def _check_property_keys(self) -> None:
149149
# We cannot make changes to the dict while iterating, so we
150150
# must save and make the changes afterwards
151151
changed_keys = []
152-
for k in self.properties.keys():
152+
for k in self.application_properties.keys():
153153
if isinstance(k, str):
154154
# strings and their subclasses
155155
if type(k) is symbol or type(k) is char:
@@ -169,9 +169,12 @@ def _check_property_keys(self) -> None:
169169
)
170170
# Make the key changes
171171
for old_key, new_key in changed_keys:
172-
self.properties[new_key] = self.properties.pop(old_key)
172+
self.application_properties[new_key] = self.application_properties.pop(
173+
old_key
174+
)
173175

174176
def _pre_encode(self) -> None:
177+
175178
inst = Data(pn_message_instructions(self._msg))
176179
ann = Data(pn_message_annotations(self._msg))
177180
props = Data(pn_message_properties(self._msg))
@@ -184,9 +187,9 @@ def _pre_encode(self) -> None:
184187
if self.annotations is not None:
185188
ann.put_object(self.annotations)
186189
props.clear()
187-
if self.properties is not None:
190+
if self.application_properties is not None:
188191
self._check_property_keys()
189-
props.put_object(self.properties)
192+
props.put_object(self.application_properties)
190193
body.clear()
191194
if self.body is not None:
192195
body.put_object(self.body)
@@ -206,9 +209,9 @@ def _post_decode(self) -> None:
206209
else:
207210
self.annotations = None
208211
if props.next():
209-
self.properties = props.get_object()
212+
self.application_properties = props.get_object()
210213
else:
211-
self.properties = None
214+
self.application_properties = None
212215
if body.next():
213216
self.body = body.get_object()
214217
else:
@@ -222,7 +225,7 @@ def clear(self) -> None:
222225
pn_message_clear(self._msg)
223226
self.instructions = None
224227
self.annotations = None
225-
self.properties = None
228+
self.application_properties = None
226229
self.body = None
227230

228231
@property
@@ -641,7 +644,7 @@ def __repr__(self) -> str:
641644
"reply_to_group_id",
642645
"instructions",
643646
"annotations",
644-
"properties",
647+
"application_properties",
645648
"body",
646649
):
647650
value = getattr(self, attr)

0 commit comments

Comments
 (0)