Skip to content

Commit 404f117

Browse files
committed
example
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 70621df commit 404f117

File tree

3 files changed

+146
-1
lines changed

3 files changed

+146
-1
lines changed

examples/streams_with_filters/example_streams_with_filters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def main() -> None:
7272
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
7373
"""
7474

75-
queue_name = "stream-example-with_filtering-queue"
75+
queue_name = "stream-example-with-message-properties-filter-queue"
7676
logger.info("Creating connection")
7777
environment = Environment("amqp://guest:guest@localhost:5672/")
7878
connection = create_connection(environment)
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# type: ignore
2+
import logging
3+
4+
from rabbitmq_amqp_python_client import (
5+
AddressHelper,
6+
AMQPMessagingHandler,
7+
Connection,
8+
ConnectionClosed,
9+
Converter,
10+
Environment,
11+
Event,
12+
Message,
13+
OffsetSpecification,
14+
StreamConsumerOptions,
15+
StreamFilterOptions,
16+
StreamSpecification,
17+
)
18+
19+
MESSAGES_TO_PUBLISH = 100
20+
21+
22+
class MyMessageHandler(AMQPMessagingHandler):
23+
24+
def __init__(self):
25+
super().__init__()
26+
self._count = 0
27+
28+
def on_amqp_message(self, event: Event):
29+
# only messages with banana filters and with subject yellow
30+
# and application property from = italy get received
31+
self._count = self._count + 1
32+
logger.info(
33+
"Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format(
34+
Converter.bytes_to_string(event.message.body),
35+
event.message.subject,
36+
event.message.application_properties,
37+
self._count,
38+
)
39+
)
40+
self.delivery_context.accept(event)
41+
42+
def on_connection_closed(self, event: Event):
43+
# if you want you can add cleanup operations here
44+
print("connection closed")
45+
46+
def on_link_closed(self, event: Event) -> None:
47+
# if you want you can add cleanup operations here
48+
print("link closed")
49+
50+
51+
def create_connection(environment: Environment) -> Connection:
52+
connection = environment.connection()
53+
connection.dial()
54+
55+
return connection
56+
57+
58+
logging.basicConfig()
59+
logger = logging.getLogger("[streams_with_filters]")
60+
logger.setLevel(logging.INFO)
61+
62+
63+
def main() -> None:
64+
"""
65+
In this example we create a stream queue and a consumer with SQL filter
66+
67+
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
68+
"""
69+
70+
queue_name = "stream-example-with-sql-filter-queue"
71+
logger.info("Creating connection")
72+
environment = Environment("amqp://guest:guest@localhost:5672/")
73+
connection = create_connection(environment)
74+
management = connection.management()
75+
# delete the queue if it exists
76+
management.delete_queue(queue_name)
77+
# create a stream queue
78+
management.declare_queue(StreamSpecification(name=queue_name))
79+
80+
addr_queue = AddressHelper.queue_address(queue_name)
81+
82+
consumer_connection = create_connection(environment)
83+
sql = (
84+
"properties.subject LIKE '%in_the_filter%' "
85+
"AND a_in_the_filter_key = 'a_in_the_filter_value'"
86+
)
87+
88+
consumer = consumer_connection.consumer(
89+
addr_queue,
90+
message_handler=MyMessageHandler(),
91+
stream_consumer_options=StreamConsumerOptions(
92+
offset_specification=OffsetSpecification.first,
93+
filter_options=StreamFilterOptions(sql=sql),
94+
),
95+
)
96+
print(
97+
"create a consumer and consume the test message - press control + c to terminate to consume"
98+
)
99+
100+
# print("create a publisher and publish a test message")
101+
publisher = connection.publisher(addr_queue)
102+
103+
# publish messages won't match the filter
104+
for i in range(MESSAGES_TO_PUBLISH):
105+
publisher.publish(Message(Converter.string_to_bytes(body="apple: " + str(i))))
106+
107+
# publish messages that will match the filter
108+
for i in range(MESSAGES_TO_PUBLISH):
109+
msqMatch = Message(
110+
body=Converter.string_to_bytes("the_right_one_sql"),
111+
# will match due of %
112+
subject="something_in_the_filter_{}".format(i),
113+
application_properties={"a_in_the_filter_key": "a_in_the_filter_value"},
114+
)
115+
publisher.publish(msqMatch)
116+
117+
publisher.close()
118+
119+
while True:
120+
try:
121+
consumer.run()
122+
except KeyboardInterrupt:
123+
pass
124+
except ConnectionClosed:
125+
print("connection closed")
126+
continue
127+
except Exception as e:
128+
print("consumer exited for exception " + str(e))
129+
130+
break
131+
132+
#
133+
logger.info("consumer exited, deleting queue")
134+
management.delete_queue(queue_name)
135+
136+
print("closing connections")
137+
management.close()
138+
print("after management closing")
139+
environment.close()
140+
print("after connection closing")
141+
142+
143+
if __name__ == "__main__":
144+
main()

tests/test_streams.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,7 @@ def test_stream_filter_sql(connection: Connection, environment: Environment) ->
579579
)
580580
publisher.publish(msg)
581581

582+
# the only one that will match
582583
msqMatch = Message(
583584
body=Converter.string_to_bytes("the_right_one_sql"),
584585
subject="something_in_the_filter",

0 commit comments

Comments
 (0)