1010from DIRAC import gConfig
1111from DIRAC .Core .Utilities .ReturnValues import convertToReturnValue , returnValueOrRaise
1212
13+ # Setting for the reconnection handling by stomp interface.
14+ # See e.g. the description of Transport class in
15+ # https://github.com/jasonrbriggs/stomp.py/blob/master/stomp/transport.py
16+
17+ RECONNECT_SLEEP_INITIAL = 1 # [s] Initial delay before reattempting to establish a connection.
18+ RECONNECT_SLEEP_INCREASE = 0.5 # Factor by which sleep delay is increased 0.5 means increase by 50%.
19+ RECONNECT_SLEEP_MAX = 120 # [s] The maximum delay that can be reached independent of increasing procedure.
20+ RECONNECT_SLEEP_JITTER = 0.1 # Random factor to add. 0.1 means a random number from 0 to 10% of the current time.
21+ RECONNECT_ATTEMPTS_MAX = 1e4 # Maximum attempts to reconnect.
22+
23+ OUTGOING_HEARTBEAT_MS = 15_000
24+ INCOMING_HEARTBEAT_MS = 15_000
25+ STOMP_TIMEOUT = 60
26+
27+
28+ DEFAULT_CONNECTION_KWARGS = {
29+ "keepalive" : True ,
30+ "timeout" : STOMP_TIMEOUT ,
31+ "heartbeats" : (OUTGOING_HEARTBEAT_MS , INCOMING_HEARTBEAT_MS ),
32+ "reconnect_sleep_initial" : RECONNECT_SLEEP_INITIAL ,
33+ "reconnect_sleep_increase" : RECONNECT_SLEEP_INCREASE ,
34+ "reconnect_sleep_max" : RECONNECT_SLEEP_MAX ,
35+ "reconnect_sleep_jitter" : RECONNECT_SLEEP_JITTER ,
36+ "reconnect_attempts_max" : RECONNECT_ATTEMPTS_MAX ,
37+ }
38+
1339
1440def _resolve_brokers (alias : str , port : int , ipv4Only : bool = False , ipv6Only : bool = False ) -> list [tuple [str , int ]]:
1541 """
@@ -207,7 +233,9 @@ def _connectAndSubscribe(
207233 as a callback to the reconnect listener
208234
209235 """
210-
236+ # We need to explicitely call disconnect to avoid leaving
237+ # threads behind
238+ conn .disconnect ()
211239 conn .connect (username = username , passcode = password , wait = True )
212240 for dest in destinations :
213241 subscribtionID = getSubscriptionID (broker , dest )
@@ -283,6 +311,7 @@ def __init__(self, host: str, port: int, username: str, password: str, destinati
283311 :param kwargs: given to ~stomp.Connection constructor
284312 """
285313 brokers = _resolve_brokers (host , port )
314+
286315 super ().__init__ (brokers , * args , ** kwargs )
287316
288317 self .connect (username , password , True )
@@ -306,6 +335,7 @@ def send(self, body, **kwargs):
306335 try :
307336 super ().send (self ._destination , body , ** kwargs )
308337 except stomp .exception .StompException :
338+ self .disconnect ()
309339 self .connect (self ._username , self ._password , True )
310340 else :
311341 return True
@@ -400,6 +430,6 @@ def createProducer(
400430 raise ValueError ("There should be exactly one destination given in parameter or in the CS" )
401431 destination = csDestinations [0 ]
402432
403- producer = StompProducer (host , port , username , password , destination )
433+ producer = StompProducer (host , port , username , password , destination , ** DEFAULT_CONNECTION_KWARGS )
404434
405435 return producer
0 commit comments