Skip to content

Commit 970b50f

Browse files
authored
Merge pull request #8027 from DIRACGridBot/cherry-pick-2-daf5207d2-integration
[sweep:integration] explicitly disconnect Stomp before reconnecting
2 parents d9da8cc + a4914ca commit 970b50f

File tree

2 files changed

+35
-2
lines changed

2 files changed

+35
-2
lines changed

src/DIRAC/Resources/MessageQueue/Simple/StompInterface.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,32 @@
1010
from DIRAC import gConfig
1111
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue, returnValueOrRaise
1212

13+
# Setting for the reconnection handling by stomp interface.
14+
# See e.g. the description of Transport class in
15+
# https://github.com/jasonrbriggs/stomp.py/blob/master/stomp/transport.py
16+
17+
RECONNECT_SLEEP_INITIAL = 1 # [s] Initial delay before reattempting to establish a connection.
18+
RECONNECT_SLEEP_INCREASE = 0.5 # Factor by which sleep delay is increased 0.5 means increase by 50%.
19+
RECONNECT_SLEEP_MAX = 120 # [s] The maximum delay that can be reached independent of increasing procedure.
20+
RECONNECT_SLEEP_JITTER = 0.1 # Random factor to add. 0.1 means a random number from 0 to 10% of the current time.
21+
RECONNECT_ATTEMPTS_MAX = 1e4 # Maximum attempts to reconnect.
22+
23+
OUTGOING_HEARTBEAT_MS = 15_000
24+
INCOMING_HEARTBEAT_MS = 15_000
25+
STOMP_TIMEOUT = 60
26+
27+
28+
DEFAULT_CONNECTION_KWARGS = {
29+
"keepalive": True,
30+
"timeout": STOMP_TIMEOUT,
31+
"heartbeats": (OUTGOING_HEARTBEAT_MS, INCOMING_HEARTBEAT_MS),
32+
"reconnect_sleep_initial": RECONNECT_SLEEP_INITIAL,
33+
"reconnect_sleep_increase": RECONNECT_SLEEP_INCREASE,
34+
"reconnect_sleep_max": RECONNECT_SLEEP_MAX,
35+
"reconnect_sleep_jitter": RECONNECT_SLEEP_JITTER,
36+
"reconnect_attempts_max": RECONNECT_ATTEMPTS_MAX,
37+
}
38+
1339

1440
def _resolve_brokers(alias: str, port: int, ipv4Only: bool = False, ipv6Only: bool = False) -> list[tuple[str, int]]:
1541
"""
@@ -207,7 +233,9 @@ def _connectAndSubscribe(
207233
as a callback to the reconnect listener
208234
209235
"""
210-
236+
# We need to explicitely call disconnect to avoid leaving
237+
# threads behind
238+
conn.disconnect()
211239
conn.connect(username=username, passcode=password, wait=True)
212240
for dest in destinations:
213241
subscribtionID = getSubscriptionID(broker, dest)
@@ -283,6 +311,7 @@ def __init__(self, host: str, port: int, username: str, password: str, destinati
283311
:param kwargs: given to ~stomp.Connection constructor
284312
"""
285313
brokers = _resolve_brokers(host, port)
314+
286315
super().__init__(brokers, *args, **kwargs)
287316

288317
self.connect(username, password, True)
@@ -306,6 +335,7 @@ def send(self, body, **kwargs):
306335
try:
307336
super().send(self._destination, body, **kwargs)
308337
except stomp.exception.StompException:
338+
self.disconnect()
309339
self.connect(self._username, self._password, True)
310340
else:
311341
return True
@@ -400,6 +430,6 @@ def createProducer(
400430
raise ValueError("There should be exactly one destination given in parameter or in the CS")
401431
destination = csDestinations[0]
402432

403-
producer = StompProducer(host, port, username, password, destination)
433+
producer = StompProducer(host, port, username, password, destination, **DEFAULT_CONNECTION_KWARGS)
404434

405435
return producer

src/DIRAC/Resources/MessageQueue/StompMQConnector.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ def connect(self, parameters=None):
197197

198198
for _ in range(10):
199199
try:
200+
# We need to explicitely call disconnect to avoid leaving
201+
# threads behind
202+
self.connection.disconnect()
200203
self.connection.connect(username=user, passcode=password, wait=True)
201204

202205
if self.connection.is_connected():

0 commit comments

Comments
 (0)