Skip to content

Commit 3c6685d

Browse files
committed
implement application properties filter
complete: #42 Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 4146b5f commit 3c6685d

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

examples/streams_with_filters/example_streams_with_filters.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ def __init__(self):
2929

3030
def on_amqp_message(self, event: Event):
3131
# only messages with banana filters and with subject yellow
32+
# and application property from = italy get received
3233
self._count = self._count + 1
3334
logger.info(
34-
"Received message: {}, subject {}.[Total Consumed: {}]".format(
35+
"Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format(
3536
Converter.bytes_to_string(event.message.body),
3637
event.message.subject,
38+
event.message.application_properties,
3739
self._count,
3840
)
3941
)
@@ -88,13 +90,15 @@ def main() -> None:
8890
addr_queue,
8991
message_handler=MyMessageHandler(),
9092
# the consumer will only receive messages with filter value banana and subject yellow
93+
# and application property from = italy
9194
stream_consumer_options=StreamConsumerOptions(
9295
offset_specification=OffsetSpecification.first,
9396
filter_options=StreamFilterOptions(
9497
values=["banana"],
9598
message_properties=MessageProperties(
9699
subject="yellow",
97100
),
101+
application_properties={"from": "italy"}
98102
),
99103
),
100104
)
@@ -108,11 +112,13 @@ def main() -> None:
108112
# publish with a filter of apple
109113
for i in range(MESSAGES_TO_PUBLISH):
110114
color = "green" if i % 2 == 0 else "yellow"
115+
from_value = "italy" if i % 3 == 0 else "spain"
111116
publisher.publish(
112117
Message(
113118
Converter.string_to_bytes(body="apple: " + str(i)),
114119
annotations={"x-stream-filter-value": "apple"},
115120
subject=color,
121+
application_properties={"from": from_value},
116122
)
117123
)
118124

@@ -121,11 +127,13 @@ def main() -> None:
121127
# publish with a filter of banana
122128
for i in range(MESSAGES_TO_PUBLISH):
123129
color = "green" if i % 2 == 0 else "yellow"
130+
from_value = "italy" if i % 3 == 0 else "spain"
124131
publisher.publish(
125132
Message(
126133
body=Converter.string_to_bytes("banana: " + str(i)),
127134
annotations={"x-stream-filter-value": "banana"},
128135
subject=color,
136+
application_properties={"from": from_value},
129137
)
130138
)
131139

rabbitmq_amqp_python_client/entities.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
1212
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
1313
AMQP_PROPERTIES_FILTER = "amqp:properties-filter"
14+
AMQP_APPLICATION_PROPERTIES_FILTER = "amqp:application-properties-filter"
1415

1516

1617
@dataclass
@@ -252,6 +253,11 @@ def __init__(
252253

253254
if filter_options is not None and filter_options.message_properties is not None:
254255
self._filter_message_properties(filter_options.message_properties)
256+
if (
257+
filter_options is not None
258+
and filter_options.application_properties is not None
259+
):
260+
self._filter_application_properties(filter_options.application_properties)
255261

256262
def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
257263
"""
@@ -316,6 +322,19 @@ def _filter_message_properties(
316322
symbol(AMQP_PROPERTIES_FILTER), filter_prop
317323
)
318324

325+
def _filter_application_properties(
326+
self, application_properties: Optional[dict[str, Any]]
327+
) -> None:
328+
app_prop = {}
329+
if application_properties is not None:
330+
for key, value in application_properties.items():
331+
app_prop[key] = value
332+
333+
if len(app_prop) > 0:
334+
self._filter_set[symbol(AMQP_APPLICATION_PROPERTIES_FILTER)] = (
335+
Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop)
336+
)
337+
319338
def filter_set(self) -> Dict[symbol, Described]:
320339
"""
321340
Get the current filter set configuration.

0 commit comments

Comments
 (0)