Skip to content

Commit 1fd97a3

Browse files
author
DanielePalaia
committed
multinode implementation
1 parent 1eebfad commit 1fd97a3

File tree

3 files changed

+44
-31
lines changed

3 files changed

+44
-31
lines changed

examples/getting_started/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def on_link_closed(self, event: Event) -> None:
5959

6060

6161
def create_connection() -> Connection:
62-
connection = Connection("amqps://guest:guest@localhost:5672/")
62+
connection = Connection("amqp://guest:guest@localhost:5672/")
6363
# in case of SSL enablement
6464
# ca_cert_file = ".ci/certs/ca_certificate.pem"
6565
# client_cert = ".ci/certs/client_certificate.pem"

examples/getting_started/reconnection_example.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
consumer = None
2222

2323

24-
def on_disconnected():
24+
# disconnection callback
25+
# here you can cleanup or reconnect
26+
def on_disconnection():
2527

2628
print("disconnected")
2729
exchange_name = "test-exchange"
@@ -36,7 +38,8 @@ def on_disconnected():
3638
addr = AddressHelper.exchange_address(exchange_name, routing_key)
3739
addr_queue = AddressHelper.queue_address(queue_name)
3840

39-
connection = create_connection()
41+
if connection is not None:
42+
connection = create_connection()
4043
if management is not None:
4144
management = connection.management()
4245
if publisher is not None:
@@ -93,12 +96,13 @@ def on_link_closed(self, event: Event) -> None:
9396
def create_connection() -> Connection:
9497
# for multinode specify a list of urls and fill the field urls of Connection instead of url
9598
# urls = [
96-
# "amqp://ha_tls-rabbit_node0-1:5602/",
97-
# "amqp://ha_tls-rabbit_node0-2:5602/",
98-
# "amqp://ha_tls-rabbit_node0-3:5602/",
99+
# "amqp://ha_tls-rabbit_node0-1:5682/",
100+
# "amqp://ha_tls-rabbit_node1-1:5692/",
101+
# "amqp://ha_tls-rabbit_node2-1:5602/",
99102
# ]
103+
# connection = Connection(urls=urls, on_disconnection_handler=on_disconnected)
100104
connection = Connection(
101-
"amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnected
105+
url="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection
102106
)
103107
connection.dial()
104108

rabbitmq_amqp_python_client/qpid/proton/_utils.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -431,37 +431,46 @@ def __init__(
431431
on_disconnection_handler: Optional[CB] = None,
432432
**kwargs
433433
) -> None:
434-
self.disconnected = False
435-
self.timeout = timeout or 60
436-
self.container = container or Container()
437-
self.container.timeout = self.timeout
438-
self.container.start()
439-
self.conn = None
440-
self.closing = False
441-
self._on_disconnection_handler = on_disconnection_handler
434+
442435
# Preserve previous behaviour if neither reconnect nor urls are supplied
443-
if url is not None and urls is None and reconnect is None:
444-
reconnect = False
445-
url = Url(url).defaults()
446-
failed = True
447-
try:
436+
if urls is None:
437+
urls = []
438+
urls.append(url)
439+
440+
# multinode reimplementation (default one wasn't working properly)
441+
attempt = 0
442+
for i in range(len(urls)):
443+
attempt = attempt + 1
444+
self.disconnected = False
445+
self.timeout = timeout or 60
446+
self.container = container or Container()
447+
self.container.timeout = self.timeout
448+
self.container.start()
449+
self.conn = None
450+
self.closing = False
451+
self._on_disconnection_handler = on_disconnection_handler
452+
453+
url_it = urls[i]
448454
self.conn = self.container.connect(
449-
url=url,
455+
url=Url(url_it).defaults(),
450456
handler=self,
451457
ssl_domain=ssl_domain,
452-
reconnect=reconnect,
458+
reconnect=False,
453459
heartbeat=heartbeat,
454-
urls=urls,
460+
urls=None,
455461
**kwargs
456462
)
457-
self.wait(
458-
lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
459-
msg="Opening connection",
460-
)
461-
failed = False
462-
finally:
463-
if failed and self.conn:
464-
self.close()
463+
try:
464+
self.wait(
465+
lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
466+
msg="Opening connection",
467+
)
468+
469+
except ConnectionException:
470+
self.conn.close()
471+
if attempt == len(urls):
472+
raise
473+
continue
465474

466475
def create_sender(
467476
self,

0 commit comments

Comments
 (0)