10
10
from DIRAC import gConfig
11
11
from DIRAC .Core .Utilities .ReturnValues import convertToReturnValue , returnValueOrRaise
12
12
13
+ # Setting for the reconnection handling by stomp interface.
14
+ # See e.g. the description of Transport class in
15
+ # https://github.com/jasonrbriggs/stomp.py/blob/master/stomp/transport.py
16
+
17
+ RECONNECT_SLEEP_INITIAL = 1 # [s] Initial delay before reattempting to establish a connection.
18
+ RECONNECT_SLEEP_INCREASE = 0.5 # Factor by which sleep delay is increased 0.5 means increase by 50%.
19
+ RECONNECT_SLEEP_MAX = 120 # [s] The maximum delay that can be reached independent of increasing procedure.
20
+ RECONNECT_SLEEP_JITTER = 0.1 # Random factor to add. 0.1 means a random number from 0 to 10% of the current time.
21
+ RECONNECT_ATTEMPTS_MAX = 1e4 # Maximum attempts to reconnect.
22
+
23
+ OUTGOING_HEARTBEAT_MS = 15_000
24
+ INCOMING_HEARTBEAT_MS = 15_000
25
+ STOMP_TIMEOUT = 60
26
+
27
+
28
+ DEFAULT_CONNECTION_KWARGS = {
29
+ "keepalive" : True ,
30
+ "timeout" : STOMP_TIMEOUT ,
31
+ "heartbeats" : (OUTGOING_HEARTBEAT_MS , INCOMING_HEARTBEAT_MS ),
32
+ "reconnect_sleep_initial" : RECONNECT_SLEEP_INITIAL ,
33
+ "reconnect_sleep_increase" : RECONNECT_SLEEP_INCREASE ,
34
+ "reconnect_sleep_max" : RECONNECT_SLEEP_MAX ,
35
+ "reconnect_sleep_jitter" : RECONNECT_SLEEP_JITTER ,
36
+ "reconnect_attempts_max" : RECONNECT_ATTEMPTS_MAX ,
37
+ }
38
+
13
39
14
40
def _resolve_brokers (alias : str , port : int , ipv4Only : bool = False , ipv6Only : bool = False ) -> list [tuple [str , int ]]:
15
41
"""
@@ -207,7 +233,9 @@ def _connectAndSubscribe(
207
233
as a callback to the reconnect listener
208
234
209
235
"""
210
-
236
+ # We need to explicitely call disconnect to avoid leaving
237
+ # threads behind
238
+ conn .disconnect ()
211
239
conn .connect (username = username , passcode = password , wait = True )
212
240
for dest in destinations :
213
241
subscribtionID = getSubscriptionID (broker , dest )
@@ -283,6 +311,7 @@ def __init__(self, host: str, port: int, username: str, password: str, destinati
283
311
:param kwargs: given to ~stomp.Connection constructor
284
312
"""
285
313
brokers = _resolve_brokers (host , port )
314
+
286
315
super ().__init__ (brokers , * args , ** kwargs )
287
316
288
317
self .connect (username , password , True )
@@ -306,6 +335,7 @@ def send(self, body, **kwargs):
306
335
try :
307
336
super ().send (self ._destination , body , ** kwargs )
308
337
except stomp .exception .StompException :
338
+ self .disconnect ()
309
339
self .connect (self ._username , self ._password , True )
310
340
else :
311
341
return True
@@ -400,6 +430,6 @@ def createProducer(
400
430
raise ValueError ("There should be exactly one destination given in parameter or in the CS" )
401
431
destination = csDestinations [0 ]
402
432
403
- producer = StompProducer (host , port , username , password , destination )
433
+ producer = StompProducer (host , port , username , password , destination , ** DEFAULT_CONNECTION_KWARGS )
404
434
405
435
return producer
0 commit comments