Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to e

### Managing disconnections

At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected.
You can use this callback to implement your own logic and eventually attempt a reconnection.
The client supports automatic reconnection with the ability to reconnect Managements, Producers and Consumers

You can check the [`reconnection example`](./examples/reconnection/reconnection_example.py) to see how to manage disconnections and
eventually attempt a reconnection
Expand Down
4 changes: 2 additions & 2 deletions examples/getting_started/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self):
super().__init__()
self._count = 0

def on_message(self, event: Event):
def on_amqp_message(self, event: Event):
print("received message: " + str(event.message.body))

# accepting
Expand Down Expand Up @@ -147,7 +147,7 @@ def main() -> None:
consumer.close()
# once we finish consuming if we close the connection we need to create a new one
# connection = create_connection()
# management = connection.management()
management = connection.management()

print("unbind")
management.unbind(bind_name)
Expand Down
13 changes: 13 additions & 0 deletions examples/reconnection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Automatic reconnection
===

You can use this example to test automatic reconnection.

The scenario is publishing and consuming a lot of messages in a queue.

From the RabbitMQ UI you can break a connection to see the automatic reconnection happening.

Same for Consumers.

In case of streams we connection will restart consuming from the last consumed offset.

125 changes: 27 additions & 98 deletions examples/reconnection/reconnection_example.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,22 @@
# type: ignore


import time
from dataclasses import dataclass
from typing import Optional

from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
Connection,
ConnectionClosed,
Consumer,
Environment,
Event,
ExchangeSpecification,
ExchangeToQueueBindingSpecification,
Management,
Message,
Publisher,
QuorumQueueSpecification,
)


# here we keep track of the objects we need to reconnect
@dataclass
class ConnectionConfiguration:
connection: Optional[Connection] = None
management: Optional[Management] = None
publisher: Optional[Publisher] = None
consumer: Optional[Consumer] = None


connection_configuration = ConnectionConfiguration()
MESSAGES_TO_PUBLSH = 50000

MESSAGES_TO_PUBLISH = 50000

# disconnection callback
# here you can cleanup or reconnect
def on_disconnection():

print("disconnected")
global environment
exchange_name = "test-exchange"
queue_name = "example-queue"
routing_key = "routing-key"

global connection_configuration

addr = AddressHelper.exchange_address(exchange_name, routing_key)
addr_queue = AddressHelper.queue_address(queue_name)

if connection_configuration.connection is not None:
connection_configuration.connection = create_connection()
if connection_configuration.management is not None:
connection_configuration.management = (
connection_configuration.connection.management()
)
if connection_configuration.publisher is not None:
connection_configuration.publisher = (
connection_configuration.connection.publisher(addr)
)
if connection_configuration.consumer is not None:
connection_configuration.consumer = (
connection_configuration.connection.consumer(
addr_queue, message_handler=MyMessageHandler()
)
)


environment = Environment(
uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
)
environment = Environment(uri="amqp://guest:guest@localhost:5672/", reconnect=True)


class MyMessageHandler(AMQPMessagingHandler):
Expand Down Expand Up @@ -102,7 +48,7 @@ def on_message(self, event: Event):

self._count = self._count + 1

if self._count == MESSAGES_TO_PUBLSH:
if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here

Expand Down Expand Up @@ -136,29 +82,22 @@ def main() -> None:
queue_name = "example-queue"
routing_key = "routing-key"

global connection_configuration

print("connection to amqp server")
if connection_configuration.connection is None:
connection_configuration.connection = create_connection()

if connection_configuration.management is None:
connection_configuration.management = (
connection_configuration.connection.management()
)
connection = create_connection()
management = connection.management()
publisher = None
consumer = None

print("declaring exchange and queue")
connection_configuration.management.declare_exchange(
ExchangeSpecification(name=exchange_name)
)
management.declare_exchange(ExchangeSpecification(name=exchange_name))

connection_configuration.management.declare_queue(
management.declare_queue(
QuorumQueueSpecification(name=queue_name)
# QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter")
)

print("binding queue to exchange")
bind_name = connection_configuration.management.bind(
bind_name = management.bind(
ExchangeToQueueBindingSpecification(
source_exchange=exchange_name,
destination_queue=queue_name,
Expand All @@ -171,34 +110,32 @@ def main() -> None:
addr_queue = AddressHelper.queue_address(queue_name)

print("create a publisher and publish a test message")
if connection_configuration.publisher is None:
connection_configuration.publisher = (
connection_configuration.connection.publisher(addr)
)
if publisher is None:
publisher = connection.publisher(addr)

print("purging the queue")
messages_purged = connection_configuration.management.purge_queue(queue_name)
messages_purged = management.purge_queue(queue_name)

print("messages purged: " + str(messages_purged))
# management.close()

# publishing messages
while True:
for i in range(MESSAGES_TO_PUBLSH):
for i in range(MESSAGES_TO_PUBLISH):

if i % 1000 == 0:
print("published 1000 messages...")
try:
if connection_configuration.publisher is not None:
connection_configuration.publisher.publish(Message(body="test"))
if publisher is not None:
publisher.publish(Message(body="test"))
except ConnectionClosed:
print("publisher closing exception, resubmitting")
# publisher = connection.publisher(addr)
continue

print("closing publisher")
try:
if connection_configuration.publisher is not None:
connection_configuration.publisher.close()
if publisher is not None:
publisher.close()
except ConnectionClosed:
print("publisher closing exception, resubmitting")
continue
Expand All @@ -207,43 +144,35 @@ def main() -> None:
print(
"create a consumer and consume the test message - press control + c to terminate to consume"
)
if connection_configuration.consumer is None:
connection_configuration.consumer = (
connection_configuration.connection.consumer(
addr_queue, message_handler=MyMessageHandler()
)
)
if consumer is None:
consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler())

while True:
try:
connection_configuration.consumer.run()
consumer.run()
except KeyboardInterrupt:
pass
except ConnectionClosed:
time.sleep(1)
continue
except Exception as e:
print("consumer exited for exception " + str(e))

break

print("cleanup")
connection_configuration.consumer.close()
# once we finish consuming if we close the connection we need to create a new one
# connection = create_connection()
# management = connection.management()
consumer.close()

print("unbind")
connection_configuration.management.unbind(bind_name)
management.unbind(bind_name)

print("delete queue")
connection_configuration.management.delete_queue(queue_name)
management.delete_queue(queue_name)

print("delete exchange")
connection_configuration.management.delete_exchange(exchange_name)
management.delete_exchange(exchange_name)

print("closing connections")
connection_configuration.management.close()
management.close()
print("after management closing")
environment.close()
print("after connection closing")
Expand Down
25 changes: 17 additions & 8 deletions examples/streams/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
AddressHelper,
AMQPMessagingHandler,
Connection,
ConnectionClosed,
Environment,
Event,
Message,
Expand All @@ -12,7 +13,7 @@
StreamSpecification,
)

MESSAGES_TO_PUBLISH = 100
MESSAGES_TO_PUBLISH = 1


class MyMessageHandler(AMQPMessagingHandler):
Expand All @@ -21,7 +22,7 @@ def __init__(self):
super().__init__()
self._count = 0

def on_message(self, event: Event):
def on_amqp_message(self, event: Event):
# just messages with banana filters get received
print(
"received message from stream: "
Expand Down Expand Up @@ -86,7 +87,7 @@ def main() -> None:
queue_name = "example-queue"

print("connection to amqp server")
environment = Environment("amqp://guest:guest@localhost:5672/")
environment = Environment("amqp://guest:guest@localhost:5672/", reconnect=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of reconnect=True, I would use something like:
https://github.com/rabbitmq/rabbitmq-amqp-go-client/blob/d84c3d22de34b767e55e68e4f0fc54137b506924/pkg/rabbitmqamqp/amqp_connection_recovery.go#L8

RecoveryConfiguration struct {
	/*
		ActiveRecovery Define if the recovery is activated.
		If is not activated the connection will not try to createSender.
	*/
	ActiveRecovery bool

	/*
		BackOffReconnectInterval The time to wait before trying to createSender after a connection is closed.
		time will be increased exponentially with each attempt.
		Default is 5 seconds, each attempt will double the time.
		The minimum value is 1 second. Avoid setting a value low values since it can cause a high
		number of reconnection attempts.
	*/
	BackOffReconnectInterval time.Duration

	/*
		MaxReconnectAttempts The maximum number of reconnection attempts.
		Default is 5.
		The minimum value is 1.
	*/
	MaxReconnectAttempts int

to make the reconnections more configurable

connection = create_connection(environment)

management = connection.management()
Expand Down Expand Up @@ -134,14 +135,22 @@ def main() -> None:

publisher.close()

try:
consumer.run()
except KeyboardInterrupt:
pass
while True:
try:
consumer.run()
except KeyboardInterrupt:
pass
except ConnectionClosed:
print("connection closed")
continue
except Exception as e:
print("consumer exited for exception " + str(e))

break

#
print("delete queue")
management.delete_queue(queue_name)
# management.delete_queue(queue_name)

print("closing connections")
management.close()
Expand Down
16 changes: 16 additions & 0 deletions rabbitmq_amqp_python_client/amqp_consumer_handler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .delivery_context import DeliveryContext
from .qpid.proton._events import Event
from .qpid.proton.handlers import MessagingHandler

"""
Expand All @@ -20,3 +21,18 @@ def __init__(self, auto_accept: bool = False, auto_settle: bool = True):
"""
super().__init__(auto_accept=auto_accept, auto_settle=auto_settle)
self.delivery_context: DeliveryContext = DeliveryContext()
self._offset = 0

def on_amqp_message(self, event: Event) -> None:
pass

def on_message(self, event: Event) -> None:
print("first level callback")
if "x-stream-offset" in event.message.annotations:
print("setting offset")
self._offset = int(event.message.annotations["x-stream-offset"])
self.on_amqp_message(event)

@property
def offset(self) -> int:
return self._offset
Loading