Skip to content

Commit 46d3a2e

Browse files
committed
Add examples
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 7f8919a commit 46d3a2e

File tree

3 files changed

+156
-2
lines changed

3 files changed

+156
-2
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ rabbitmq-server-stop:
99

1010
format:
1111
poetry run isort --skip rabbitmq_amqp_python_client/qpid --skip .venv .
12-
poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid .
1312
poetry run black rabbitmq_amqp_python_client/
1413
poetry run black tests/
1514
poetry run flake8 --exclude=venv,.venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
15+
poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid .
1616

1717
test: format
1818
poetry run pytest .

examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ Client examples
44
- [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection
55
- [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection
66
- [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities
7-
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token
7+
- [Oauth](./oauth/oAuth2.py) - Connection through Oauth token
8+
- [Streams with filters](./streams_with_filters/example_streams_with_filters.py) - Example supporting stream capabilities with filters
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
# type: ignore
2+
import logging
3+
import time
4+
5+
from rabbitmq_amqp_python_client import (
6+
AddressHelper,
7+
AMQPMessagingHandler,
8+
Connection,
9+
ConnectionClosed,
10+
Converter,
11+
Environment,
12+
Event,
13+
Message,
14+
MessageProperties,
15+
OffsetSpecification,
16+
StreamConsumerOptions,
17+
StreamFilterOptions,
18+
StreamSpecification,
19+
)
20+
21+
MESSAGES_TO_PUBLISH = 100
22+
23+
24+
class MyMessageHandler(AMQPMessagingHandler):
25+
26+
def __init__(self):
27+
super().__init__()
28+
self._count = 0
29+
30+
def on_amqp_message(self, event: Event):
31+
# only messages with banana filters and with subject yellow
32+
self._count = self._count + 1
33+
logger.info("Received message: {}, subject {}.[Total Consumed: {}]".
34+
format(Converter.bytes_to_string(event.message.body), event.message.subject, self._count))
35+
self.delivery_context.accept(event)
36+
37+
def on_connection_closed(self, event: Event):
38+
# if you want you can add cleanup operations here
39+
print("connection closed")
40+
41+
def on_link_closed(self, event: Event) -> None:
42+
# if you want you can add cleanup operations here
43+
print("link closed")
44+
45+
46+
def create_connection(environment: Environment) -> Connection:
47+
connection = environment.connection()
48+
connection.dial()
49+
50+
return connection
51+
52+
53+
logging.basicConfig()
54+
logger = logging.getLogger("[streams_with_filters]")
55+
logger.setLevel(logging.INFO)
56+
57+
58+
def main() -> None:
59+
"""
60+
In this example we create a stream queue and a consumer with filtering options.
61+
The example combines two filters:
62+
- filter value: banana
63+
- subject: yellow
64+
65+
See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions
66+
"""
67+
68+
queue_name = "stream-example-with_filtering-queue"
69+
logger.info("Creating connection")
70+
environment = Environment("amqp://guest:guest@localhost:5672/")
71+
connection = create_connection(environment)
72+
management = connection.management()
73+
# delete the queue if it exists
74+
management.delete_queue(queue_name)
75+
# create a stream queue
76+
management.declare_queue(StreamSpecification(name=queue_name))
77+
78+
addr_queue = AddressHelper.queue_address(queue_name)
79+
80+
consumer_connection = create_connection(environment)
81+
82+
consumer = consumer_connection.consumer(
83+
addr_queue,
84+
message_handler=MyMessageHandler(),
85+
86+
# the consumer will only receive messages with filter value banana and subject yellow
87+
88+
stream_consumer_options=StreamConsumerOptions(
89+
offset_specification=OffsetSpecification.first,
90+
filter_options=StreamFilterOptions(
91+
values=["banana"],
92+
message_properties=MessageProperties(subject="yellow", )))
93+
94+
)
95+
print(
96+
"create a consumer and consume the test message - press control + c to terminate to consume"
97+
)
98+
99+
# print("create a publisher and publish a test message")
100+
publisher = connection.publisher(addr_queue)
101+
102+
# publish with a filter of apple
103+
for i in range(MESSAGES_TO_PUBLISH):
104+
color = "green" if i % 2 == 0 else "yellow"
105+
publisher.publish(
106+
Message(
107+
Converter.string_to_bytes(body="apple: " + str(i)),
108+
annotations={"x-stream-filter-value": "apple"},
109+
subject=color,
110+
)
111+
)
112+
113+
time.sleep(0.5) # wait a bit to ensure messages are published in different chunks
114+
115+
# publish with a filter of banana
116+
for i in range(MESSAGES_TO_PUBLISH):
117+
color = "green" if i % 2 == 0 else "yellow"
118+
publisher.publish(
119+
Message(
120+
body=Converter.string_to_bytes("banana: " + str(i)),
121+
annotations={"x-stream-filter-value": "banana"},
122+
subject=color,
123+
)
124+
)
125+
126+
publisher.close()
127+
128+
while True:
129+
try:
130+
consumer.run()
131+
except KeyboardInterrupt:
132+
pass
133+
except ConnectionClosed:
134+
print("connection closed")
135+
continue
136+
except Exception as e:
137+
print("consumer exited for exception " + str(e))
138+
139+
break
140+
141+
#
142+
logger.info("consumer exited, deleting queue")
143+
management.delete_queue(queue_name)
144+
145+
print("closing connections")
146+
management.close()
147+
print("after management closing")
148+
environment.close()
149+
print("after connection closing")
150+
151+
152+
if __name__ == "__main__":
153+
main()

0 commit comments

Comments
 (0)