Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ To run TLS you need to:

Read more about the issue [here](https://stackoverflow.com/questions/44979947/python-qpid-proton-for-mac-using-amqps)

### SSL Problems in local enviroment

If when running tests, this exceptions is raised by the proton library: `SSLUnavailable`:
``` bash
pip uninstall python-qpid-proton -y

sudo apt-get update
sudo apt-get install -y swig cmake build-essential libssl-dev pkg-config

export PKG_CONFIG_PATH=/usr/lib/x86_64-linux-gnu/pkgconfig
export CFLAGS="-I/usr/include/openssl"
export LDFLAGS="-L/usr/lib/x86_64-linux-gnu"

pip install "python-qpid-proton>=0.39.0,<0.40.0" --no-binary python-qpid-proton --verbose --no-cache-dir
```



Expand Down
123 changes: 123 additions & 0 deletions examples/getting_started/getting_started_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# type: ignore

import asyncio

from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
AsyncEnvironment,
Converter,
Event,
ExchangeSpecification,
ExchangeToQueueBindingSpecification,
Message,
OutcomeState,
QuorumQueueSpecification,
)

MESSAGES_TO_PUBLISH = 100


class StopConsumerException(Exception):
"""Exception to signal consumer should stop"""
pass


class MyMessageHandler(AMQPMessagingHandler):

def __init__(self):
super().__init__()
self._count = 0

def on_amqp_message(self, event: Event):
print(
"received message: {} ".format(
Converter.bytes_to_string(event.message.body)
)
)

self.delivery_context.accept(event)
self._count = self._count + 1
print("count " + str(self._count))

if self._count == MESSAGES_TO_PUBLISH:
print("received all messages")
# Stop the consumer by raising an exception
raise StopConsumerException("All messages consumed")

def on_connection_closed(self, event: Event):
print("connection closed")

def on_link_closed(self, event: Event) -> None:
print("link closed")


async def main():
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"

print("connection to amqp server")
async with AsyncEnvironment(
uri="amqp://guest:guest@localhost:5672/"
) as environment:
async with await environment.connection() as connection:
async with await connection.management() as management:
print("declaring exchange and queue")
await management.declare_exchange(ExchangeSpecification(name=exchange_name))
await management.declare_queue(
QuorumQueueSpecification(name=queue_name)
)

print("binding queue to exchange")
bind_name = await management.bind(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
binding_key=routing_key,
)
)

addr = AddressHelper.exchange_address(exchange_name, routing_key)
addr_queue = AddressHelper.queue_address(queue_name)

print("create a publisher and publish a test message")
async with await connection.publisher(addr) as publisher:
print("purging the queue")
messages_purged = await management.purge_queue(queue_name)
print("messages purged: " + str(messages_purged))

# publish messages
for i in range(MESSAGES_TO_PUBLISH):
status = await publisher.publish(
Message(body=Converter.string_to_bytes("test message {} ".format(i)))
)
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted")

print("create a consumer and consume the test message - press control + c to terminate to consume")
handler = MyMessageHandler()
async with await connection.consumer(addr_queue, message_handler=handler) as consumer:
# Run the consumer in a background task
consumer_task = asyncio.create_task(consumer.run())

try:
# Wait for the consumer to finish (e.g., by raising the exception)
await consumer_task
except StopConsumerException as e:
print(f"Consumer stopped: {e}")
except KeyboardInterrupt:
print("consumption interrupted by user, stopping consumer...")
await consumer.stop()

print("unbind")
await management.unbind(bind_name)

print("delete queue")
await management.delete_queue(queue_name)

print("delete exchange")
await management.delete_exchange(exchange_name)

if __name__ == "__main__":
asyncio.run(main())
38 changes: 36 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ pytest = "^8.3.4"
black = "^24.3.0"
python-qpid-proton = "^0.39.0"
requests = "^2.31.0"
pytest-asyncio = "^1.2.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.pytest.ini_options]
asyncio_mode = "auto"
12 changes: 12 additions & 0 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

from .address_helper import AddressHelper
from .amqp_consumer_handler import AMQPMessagingHandler
from .asyncio import (
AsyncConnection,
AsyncConsumer,
AsyncEnvironment,
AsyncManagement,
AsyncPublisher,
)
from .common import ExchangeType, QueueType
from .connection import Connection
from .consumer import Consumer
Expand Down Expand Up @@ -99,4 +106,9 @@
"RecoveryConfiguration",
"OAuth2Options",
"Converter",
"AsyncConnection",
"AsyncConsumer",
"AsyncPublisher",
"AsyncManagement",
"AsyncEnvironment",
]
13 changes: 13 additions & 0 deletions rabbitmq_amqp_python_client/asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .connection import AsyncConnection
from .consumer import AsyncConsumer
from .enviroment import AsyncEnvironment
from .management import AsyncManagement
from .publisher import AsyncPublisher

__all__ = [
"AsyncConnection",
"AsyncConsumer",
"AsyncManagement",
"AsyncPublisher",
"AsyncEnvironment",
]
Loading
Loading