Skip to content

Commit 683bb12

Browse files
committed
Implement filters
based on message properites. Closes: #42 Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 36bb249 commit 683bb12

File tree

3 files changed

+183
-14
lines changed

3 files changed

+183
-14
lines changed

rabbitmq_amqp_python_client/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
ExchangeSpecification,
1111
ExchangeToExchangeBindingSpecification,
1212
ExchangeToQueueBindingSpecification,
13+
MessageProperties,
1314
OAuth2Options,
1415
OffsetSpecification,
1516
RecoveryConfiguration,
1617
StreamConsumerOptions,
18+
StreamFilterOptions,
1719
)
1820
from .environment import Environment
1921
from .exceptions import (
@@ -86,6 +88,8 @@
8688
"PKCS12Store",
8789
"ConnectionClosed",
8890
"StreamConsumerOptions",
91+
"StreamFilterOptions",
92+
"MessageProperties",
8993
"OffsetSpecification",
9094
"OutcomeState",
9195
"Environment",

rabbitmq_amqp_python_client/entities.py

Lines changed: 106 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass, field
2-
from datetime import timedelta
2+
from datetime import datetime, timedelta
33
from enum import Enum
44
from typing import Any, Dict, Optional, Union
55

@@ -10,6 +10,7 @@
1010
STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
1111
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
1212
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
13+
AMQP_PROPERTIES_FILTER = "amqp:properties-filter"
1314

1415

1516
@dataclass
@@ -149,6 +150,77 @@ class ExchangeToExchangeBindingSpecification:
149150
binding_key: Optional[str] = None
150151

151152

153+
class MessageProperties:
154+
def __init__(
155+
self,
156+
message_id: Optional[Any] = None,
157+
user_id: Optional[bytes] = None,
158+
to: Optional[str] = None,
159+
subject: Optional[str] = None,
160+
reply_to: Optional[str] = None,
161+
correlation_id: Optional[Any] = None,
162+
content_type: Optional[str] = None,
163+
content_encoding: Optional[str] = None,
164+
absolute_expiry_time: Optional[datetime] = None,
165+
creation_time: Optional[datetime] = None,
166+
group_id: Optional[str] = None,
167+
group_sequence: Optional[int] = None,
168+
reply_to_group_id: Optional[str] = None,
169+
):
170+
# Message-id, if set, uniquely identifies a message within the message system.
171+
# The message producer is usually responsible for setting the message-id in
172+
# such a way that it is assured to be globally unique. A broker MAY discard a
173+
# message as a duplicate if the value of the message-id matches that of a
174+
# previously received message sent to the same node.
175+
#
176+
# The value is restricted to the following types:
177+
# - int (for uint64), UUID, bytes, or str
178+
self.message_id: Optional[Any] = message_id
179+
180+
# The identity of the user responsible for producing the message.
181+
# The client sets this value, and it MAY be authenticated by intermediaries.
182+
self.user_id: Optional[bytes] = user_id
183+
184+
# The to field identifies the node that is the intended destination of the message.
185+
# On any given transfer this might not be the node at the receiving end of the link.
186+
self.to: Optional[str] = to
187+
188+
# A common field for summary information about the message content and purpose.
189+
self.subject: Optional[str] = subject
190+
191+
# The address of the node to send replies to.
192+
self.reply_to: Optional[str] = reply_to
193+
194+
# This is a client-specific id that can be used to mark or identify messages
195+
# between clients.
196+
#
197+
# The value is restricted to the following types:
198+
# - int (for uint64), UUID, bytes, or str
199+
self.correlation_id: Optional[Any] = correlation_id
200+
201+
# The RFC-2046 [RFC2046] MIME type for the message's application-data section (body).
202+
self.content_type: Optional[str] = content_type
203+
204+
# The content-encoding property is used as a modifier to the content-type.
205+
self.content_encoding: Optional[str] = content_encoding
206+
207+
# An absolute time when this message is considered to be expired.
208+
self.absolute_expiry_time: Optional[datetime] = absolute_expiry_time
209+
210+
# An absolute time when this message was created.
211+
self.creation_time: Optional[datetime] = creation_time
212+
213+
# Identifies the group the message belongs to.
214+
self.group_id: Optional[str] = group_id
215+
216+
# The relative position of this message within its group.
217+
self.group_sequence: Optional[int] = group_sequence
218+
219+
# This is a client-specific id that is used so that client can send replies to this
220+
# message to a specific group.
221+
self.reply_to_group_id: Optional[str] = reply_to_group_id
222+
223+
152224
"""
153225
StreamFilterOptions defines the filtering options for a stream consumer.
154226
for values and match_unfiltered see: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
@@ -159,18 +231,21 @@ class StreamFilterOptions:
159231
values: Optional[list[str]] = None
160232
match_unfiltered: bool = False
161233
application_properties: Optional[dict[str, Any]] = None
234+
message_properties: Optional[MessageProperties] = None
162235
sql: str = ""
163236

164237
def __init__(
165238
self,
166239
values: Optional[list[str]] = None,
167240
match_unfiltered: bool = False,
168241
application_properties: Optional[dict[str, Any]] = None,
242+
message_properties: Optional[MessageProperties] = None,
169243
sql: str = "",
170244
):
171245
self.values = values
172246
self.match_unfiltered = match_unfiltered
173247
self.application_properties = application_properties
248+
self.message_properties = message_properties
174249
self.sql = sql
175250

176251

@@ -195,27 +270,22 @@ def __init__(
195270
filter_options: Optional[StreamFilterOptions] = None,
196271
):
197272

198-
self.streamFilterOptions = filter_options
273+
self._filter_set: Dict[symbol, Described] = {}
199274

200-
if offset_specification is None and self.streamFilterOptions is None:
275+
if offset_specification is None and filter_options is None:
201276
raise ValidationCodeException(
202277
"At least one between offset_specification and filters must be set when setting up filtering"
203278
)
204-
self._filter_set: Dict[symbol, Described] = {}
205279
if offset_specification is not None:
206280
self._offset(offset_specification)
207281

208-
if (
209-
self.streamFilterOptions is not None
210-
and self.streamFilterOptions.values is not None
211-
):
212-
self._filter_values(self.streamFilterOptions.values)
282+
if filter_options is not None and filter_options.values is not None:
283+
self._filter_values(filter_options.values)
213284

214-
if (
215-
self.streamFilterOptions is not None
216-
and self.streamFilterOptions.match_unfiltered
217-
):
218-
self._filter_match_unfiltered(self.streamFilterOptions.match_unfiltered)
285+
if filter_options is not None and filter_options.match_unfiltered:
286+
self._filter_match_unfiltered(filter_options.match_unfiltered)
287+
288+
self._filter_message_properties(filter_options.message_properties)
219289

220290
def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
221291
"""
@@ -257,6 +327,28 @@ def _filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None:
257327
symbol(STREAM_FILTER_MATCH_UNFILTERED), filter_match_unfiltered
258328
)
259329

330+
def _filter_message_properties(self, message_properties: MessageProperties) -> None:
331+
"""
332+
Set application properties for filtering.
333+
334+
Args:
335+
message_properties: MessageProperties object containing application properties
336+
"""
337+
if message_properties.__dict__ is not None:
338+
# dictionary of symbols and described
339+
filter_prop: Dict[symbol, Any] = {}
340+
341+
for key, value in message_properties.__dict__.items():
342+
if key is not None:
343+
if message_properties.__dict__[key] is not None:
344+
# replace _ with - for the key
345+
filter_prop[symbol(key.replace("_", "-"))] = value
346+
347+
if len(filter_prop) > 0:
348+
self._filter_set[symbol(AMQP_PROPERTIES_FILTER)] = Described(
349+
symbol(AMQP_PROPERTIES_FILTER), filter_prop
350+
)
351+
260352
def filter_set(self) -> Dict[symbol, Described]:
261353
"""
262354
Get the current filter set configuration.

tests/test_streams.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
from rabbitmq_amqp_python_client import (
22
AddressHelper,
3+
AMQPMessagingHandler,
34
Connection,
5+
Converter,
46
Environment,
57
OffsetSpecification,
68
StreamConsumerOptions,
79
StreamSpecification,
810
)
911
from rabbitmq_amqp_python_client.entities import (
12+
MessageProperties,
1013
StreamFilterOptions,
1114
)
15+
from rabbitmq_amqp_python_client.qpid.proton import (
16+
Event,
17+
Message,
18+
)
1219

1320
from .conftest import (
1421
ConsumerTestException,
@@ -398,3 +405,69 @@ def test_stream_reconnection(
398405
consumer.close()
399406

400407
management.delete_queue(stream_name)
408+
409+
410+
class MyMessageHandlerMessagePropertiesFilter(AMQPMessagingHandler):
411+
def __init__(
412+
self,
413+
):
414+
super().__init__()
415+
416+
def on_message(self, event: Event):
417+
self.delivery_context.accept(event)
418+
assert event.message.subject == "important_15"
419+
assert event.message.group_id == "group_15"
420+
assert event.message.body == Converter.string_to_bytes("hello_15")
421+
raise ConsumerTestException("consumed")
422+
423+
424+
def test_stream_filter_message_properties(
425+
connection: Connection, environment: Environment
426+
) -> None:
427+
consumer = None
428+
stream_name = "test_stream_filter_message_properties"
429+
messages_to_send = 30
430+
431+
queue_specification = StreamSpecification(
432+
name=stream_name,
433+
)
434+
management = connection.management()
435+
management.declare_queue(queue_specification)
436+
437+
addr_queue = AddressHelper.queue_address(stream_name)
438+
439+
# consume and then publish
440+
try:
441+
connection_consumer = environment.connection()
442+
connection_consumer.dial()
443+
consumer = connection_consumer.consumer(
444+
addr_queue,
445+
message_handler=MyMessageHandlerMessagePropertiesFilter(),
446+
stream_consumer_options=StreamConsumerOptions(
447+
filter_options=StreamFilterOptions(
448+
message_properties=MessageProperties(
449+
subject="important_15", group_id="group_15"
450+
)
451+
)
452+
),
453+
)
454+
publisher = connection.publisher(addr_queue)
455+
for i in range(messages_to_send):
456+
msg = Message(
457+
body=Converter.string_to_bytes("hello_{}".format(i)),
458+
subject="important_{}".format(i),
459+
group_id="group_{}".format(i),
460+
)
461+
publisher.publish(msg)
462+
463+
publisher.close()
464+
465+
consumer.run()
466+
# ack to terminate the consumer
467+
except ConsumerTestException:
468+
pass
469+
470+
if consumer is not None:
471+
consumer.close()
472+
473+
management.delete_queue(stream_name)

0 commit comments

Comments
 (0)