1
- """ The StalledJobAgent hunts for stalled jobs in the Job database. Jobs in "running"
2
- state not receiving a heart beat signal for more than stalledTime
3
- seconds will be assigned the "Stalled" state.
4
-
1
+ """The StalledJobAgent hunts for stalled jobs in the Job database. Jobs in
2
+ "running" state not receiving a heart beat signal for more than stalledTime
3
+ seconds will be assigned the "Stalled" state.
5
4
6
5
.. literalinclude:: ../ConfigTemplate.cfg
7
6
:start-after: ##BEGIN StalledJobAgent
8
7
:end-before: ##END
9
8
:dedent: 2
10
9
:caption: StalledJobAgent options
11
-
12
10
"""
13
11
import concurrent .futures
14
12
import datetime
32
30
33
31
34
32
class StalledJobAgent (AgentModule ):
35
- """Agent for setting Running jobs Stalled, and Stalled jobs Failed. And a few more."""
33
+ """Agent for setting Running jobs Stalled, and Stalled jobs Failed.
34
+
35
+ And a few more.
36
+ """
36
37
37
38
def __init__ (self , * args , ** kwargs ):
38
- """c'tor"""
39
+ """c'tor. """
39
40
super ().__init__ (* args , ** kwargs )
40
41
41
42
self .jobDB = None
42
43
self .logDB = None
43
44
self .matchedTime = 7200
44
45
self .rescheduledTime = 600
45
46
self .submittingTime = 300
47
+ self .stalledJobsToleranceTime = 0
46
48
self .stalledJobsTolerantSites = []
47
49
self .stalledJobsToRescheduleSites = []
50
+ self .threadPoolExecutor = None
48
51
49
52
#############################################################################
50
53
def initialize (self ):
51
- """Sets default parameters"""
54
+ """Sets default parameters. """
52
55
self .jobDB = JobDB ()
53
56
self .logDB = JobLoggingDB ()
54
57
@@ -57,19 +60,21 @@ def initialize(self):
57
60
if not self .am_getOption ("Enable" , True ):
58
61
self .log .info ("Stalled Job Agent running in disabled mode" )
59
62
60
- wms_instance = getSystemInstance ("WorkloadManagement" )
61
- if not wms_instance :
63
+ wmsInstance = getSystemInstance ("WorkloadManagement" )
64
+ if not wmsInstance :
62
65
return S_ERROR ("Can not get the WorkloadManagement system instance" )
63
- self .stalledJobsTolerantSites = self .am_getOption ("StalledJobsTolerantSites" , [] )
64
- self .stalledJobsToleranceTime = self .am_getOption ("StalledJobsToleranceTime" , 0 )
66
+ self .stalledJobsTolerantSites = self .am_getOption ("StalledJobsTolerantSites" , self . stalledJobsTolerantSites )
67
+ self .stalledJobsToleranceTime = self .am_getOption ("StalledJobsToleranceTime" , self . stalledJobsToleranceTime )
65
68
66
- self .stalledJobsToRescheduleSites = self .am_getOption ("StalledJobsToRescheduleSites" , [])
69
+ self .stalledJobsToRescheduleSites = self .am_getOption (
70
+ "StalledJobsToRescheduleSites" , self .stalledJobsToRescheduleSites
71
+ )
67
72
68
73
self .submittingTime = self .am_getOption ("SubmittingTime" , self .submittingTime )
69
74
self .matchedTime = self .am_getOption ("MatchedTime" , self .matchedTime )
70
75
self .rescheduledTime = self .am_getOption ("RescheduledTime" , self .rescheduledTime )
71
76
72
- wrapperSection = cfgPath ("Systems" , "WorkloadManagement" , wms_instance , "JobWrapper" )
77
+ wrapperSection = cfgPath ("Systems" , "WorkloadManagement" , wmsInstance , "JobWrapper" )
73
78
74
79
failedTime = self .am_getOption ("FailedTimeHours" , 6 )
75
80
watchdogCycle = gConfig .getValue (cfgPath (wrapperSection , "CheckingTime" ), 30 * 60 )
@@ -89,14 +94,14 @@ def initialize(self):
89
94
90
95
# setting up the threading
91
96
maxNumberOfThreads = self .am_getOption ("MaxNumberOfThreads" , 15 )
92
- self .log .verbose ("Multithreaded with %d threads" % maxNumberOfThreads )
97
+ self .log .verbose (f "Multithreaded with { maxNumberOfThreads } threads" )
93
98
self .threadPoolExecutor = concurrent .futures .ThreadPoolExecutor (max_workers = maxNumberOfThreads )
94
99
95
100
return S_OK ()
96
101
97
102
#############################################################################
98
103
def execute (self ):
99
- """The main agent execution method"""
104
+ """The main agent execution method. """
100
105
# Now we are getting what's going to be checked
101
106
futures = []
102
107
@@ -162,22 +167,23 @@ def execute(self):
162
167
return S_OK ()
163
168
164
169
def finalize (self ):
165
- """graceful finalization"""
170
+ """Graceful finalization. """
166
171
167
172
self .log .info ("Wait for threads to get empty before terminating the agent" )
168
173
self .threadPoolExecutor .shutdown ()
169
174
self .log .info ("Threads are empty, terminating the agent..." )
170
175
return S_OK ()
171
176
172
177
def _execute (self , job_Op ):
173
- """
174
- Doing the actual job. This is run inside the threads
178
+ """Doing the actual job.
179
+
180
+ This is run inside the threads
175
181
"""
176
182
jobID , jobOp = job_Op .split (":" )
177
183
jobID = int (jobID )
178
184
res = getattr (self , f"{ jobOp } " )(jobID )
179
185
if not res ["OK" ]:
180
- self .log .error (f"Failure executing { jobOp } " , "on %d: %s" % ( jobID , res [" Message" ]) )
186
+ self .log .error (f"Failure executing { jobOp } " , f "on { jobID } : { res [' Message' ] } " )
181
187
182
188
#############################################################################
183
189
def _markStalledJobs (self , jobID ):
@@ -205,8 +211,8 @@ def _markStalledJobs(self, jobID):
205
211
206
212
#############################################################################
207
213
def _failStalledJobs (self , jobID ):
208
- """
209
- Changes the Stalled status to Failed for jobs long in the Stalled status.
214
+ """Changes the Stalled status to Failed for jobs long in the Stalled
215
+ status.
210
216
211
217
Run inside thread.
212
218
"""
@@ -215,7 +221,7 @@ def _failStalledJobs(self, jobID):
215
221
# Check if the job pilot is lost
216
222
result = self ._getJobPilotStatus (jobID )
217
223
if not result ["OK" ]:
218
- self .log .error ("Failed to get pilot status" , "for job %d: %s" % ( jobID , result [" Message" ]) )
224
+ self .log .error ("Failed to get pilot status" , f "for job { jobID } : { result [' Message' ] } " )
219
225
return result
220
226
pilotStatus = result ["Value" ]
221
227
if pilotStatus != "Running" :
@@ -224,7 +230,7 @@ def _failStalledJobs(self, jobID):
224
230
# Verify that there was no sign of life for long enough
225
231
result = self ._getLatestUpdateTime (jobID )
226
232
if not result ["OK" ]:
227
- self .log .error ("Failed to get job update time" , "for job %d: %s" % ( jobID , result [" Message" ]) )
233
+ self .log .error ("Failed to get job update time" , f "for job { jobID } : { result [' Message' ] } " )
228
234
return result
229
235
elapsedTime = toEpoch () - result ["Value" ]
230
236
if elapsedTime > self .failedTime :
@@ -233,7 +239,9 @@ def _failStalledJobs(self, jobID):
233
239
# Set the jobs Failed, send them a kill signal in case they are not really dead
234
240
# and send accounting info
235
241
if setFailed :
236
- self ._sendKillCommand (jobID ) # always returns None
242
+ res = self ._sendKillCommand (jobID )
243
+ if not res ["OK" ]:
244
+ self .log .error ("Failed to kill job" , jobID )
237
245
238
246
# For some sites we might want to reschedule rather than fail the jobs
239
247
if self .stalledJobsToRescheduleSites :
@@ -249,7 +257,7 @@ def _failStalledJobs(self, jobID):
249
257
return S_OK ()
250
258
251
259
def _getJobPilotStatus (self , jobID ):
252
- """Get the job pilot status"""
260
+ """Get the job pilot status. """
253
261
result = JobMonitoringClient ().getJobParameter (jobID , "Pilot_Reference" )
254
262
if not result ["OK" ]:
255
263
return result
@@ -261,9 +269,9 @@ def _getJobPilotStatus(self, jobID):
261
269
result = PilotManagerClient ().getPilotInfo (pilotReference )
262
270
if not result ["OK" ]:
263
271
if DErrno .cmpError (result , DErrno .EWMSNOPILOT ):
264
- self .log .warn ("No pilot found" , "for job %d: %s" % ( jobID , result [" Message" ]) )
272
+ self .log .warn ("No pilot found" , f "for job { jobID } : { result [' Message' ] } " )
265
273
return S_OK ("NoPilot" )
266
- self .log .error ("Failed to get pilot information" , "for job %d: %s" % ( jobID , result [" Message" ]) )
274
+ self .log .error ("Failed to get pilot information" , f "for job { jobID } : { result [' Message' ] } " )
267
275
return result
268
276
pilotStatus = result ["Value" ][pilotReference ]["Status" ]
269
277
@@ -272,8 +280,7 @@ def _getJobPilotStatus(self, jobID):
272
280
#############################################################################
273
281
def _checkJobStalled (self , job , stalledTime ):
274
282
"""Compares the most recent of LastUpdateTime and HeartBeatTime against
275
- the stalledTime limit.
276
- """
283
+ the stalledTime limit."""
277
284
result = self ._getLatestUpdateTime (job )
278
285
if not result ["OK" ]:
279
286
return result
@@ -290,7 +297,7 @@ def _checkJobStalled(self, job, stalledTime):
290
297
291
298
#############################################################################
292
299
def _getLatestUpdateTime (self , job ):
293
- """Returns the most recent of HeartBeatTime and LastUpdateTime"""
300
+ """Returns the most recent of HeartBeatTime and LastUpdateTime. """
294
301
result = self .jobDB .getJobAttributes (job , ["HeartBeatTime" , "LastUpdateTime" ])
295
302
if not result ["OK" ] or not result ["Value" ]:
296
303
self .log .error (
@@ -318,7 +325,7 @@ def _getLatestUpdateTime(self, job):
318
325
319
326
#############################################################################
320
327
def _updateJobStatus (self , job , status , minorStatus = None , force = False ):
321
- """This method updates the job status in the JobDB"""
328
+ """This method updates the job status in the JobDB. """
322
329
323
330
if not self .am_getOption ("Enable" , True ):
324
331
return S_OK ("Disabled" )
@@ -328,23 +335,21 @@ def _updateJobStatus(self, job, status, minorStatus=None, force=False):
328
335
self .log .debug (f"self.jobDB.setJobAttribute({ job } ,'Status','{ status } ',update=True)" )
329
336
result = self .jobDB .setJobAttribute (job , "Status" , status , update = True , force = force )
330
337
if not result ["OK" ]:
331
- self .log .error ("Failed setting Status" , "%s for job %d: %s" % ( status , job , result [" Message" ]) )
338
+ self .log .error ("Failed setting Status" , f" { status } for job { job } : { result [' Message' ] } " )
332
339
toRet = result
333
340
if minorStatus :
334
341
self .log .debug (f"self.jobDB.setJobAttribute({ job } ,'MinorStatus','{ minorStatus } ',update=True)" )
335
342
result = self .jobDB .setJobAttribute (job , "MinorStatus" , minorStatus , update = True )
336
343
if not result ["OK" ]:
337
- self .log .error (
338
- "Failed setting MinorStatus" , "%s for job %d: %s" % (minorStatus , job , result ["Message" ])
339
- )
344
+ self .log .error ("Failed setting MinorStatus" , f"{ minorStatus } for job { job } : { result ['Message' ]} " )
340
345
toRet = result
341
346
342
347
if not minorStatus : # Retain last minor status for stalled jobs
343
348
result = self .jobDB .getJobAttributes (job , ["MinorStatus" ])
344
349
if result ["OK" ]:
345
350
minorStatus = result ["Value" ]["MinorStatus" ]
346
351
else :
347
- self .log .error ("Failed getting MinorStatus" , "for job %d: %s" % ( job , result [" Message" ]) )
352
+ self .log .error ("Failed getting MinorStatus" , f "for job { job } : { result [' Message' ] } " )
348
353
minorStatus = "idem"
349
354
toRet = result
350
355
@@ -356,7 +361,8 @@ def _updateJobStatus(self, job, status, minorStatus=None, force=False):
356
361
return toRet
357
362
358
363
def _getProcessingType (self , jobID ):
359
- """Get the Processing Type from the JDL, until it is promoted to a real Attribute"""
364
+ """Get the Processing Type from the JDL, until it is promoted to a real
365
+ Attribute."""
360
366
processingType = "unknown"
361
367
result = self .jobDB .getJobJDL (jobID , original = True )
362
368
if not result ["OK" ]:
@@ -367,8 +373,7 @@ def _getProcessingType(self, jobID):
367
373
return processingType
368
374
369
375
def _sendAccounting (self , jobID ):
370
- """
371
- Send WMS accounting data for the given job.
376
+ """Send WMS accounting data for the given job.
372
377
373
378
Run inside thread.
374
379
"""
@@ -448,11 +453,11 @@ def _sendAccounting(self, jobID):
448
453
if result ["OK" ]:
449
454
self .jobDB .setJobAttribute (jobID , "AccountedFlag" , "True" )
450
455
else :
451
- self .log .error ("Failed to send accounting report" , "Job: %d, Error: %s" % ( int ( jobID ), result [" Message" ]) )
456
+ self .log .error ("Failed to send accounting report" , f"for job { jobID } : { result [' Message' ] } " )
452
457
return result
453
458
454
459
def _checkHeartBeat (self , jobID , jobDict ):
455
- """Get info from HeartBeat"""
460
+ """Get info from HeartBeat. """
456
461
result = self .jobDB .getHeartBeatData (jobID )
457
462
lastCPUTime = 0
458
463
lastWallTime = 0
@@ -482,7 +487,7 @@ def _checkHeartBeat(self, jobID, jobDict):
482
487
return lastCPUTime , lastWallTime , lastHeartBeatTime
483
488
484
489
def _checkLoggingInfo (self , jobID , jobDict ):
485
- """Get info from JobLogging"""
490
+ """Get info from JobLogging. """
486
491
logList = []
487
492
result = self .logDB .getJobLoggingInfo (jobID )
488
493
if result ["OK" ]:
@@ -516,7 +521,8 @@ def _checkLoggingInfo(self, jobID, jobDict):
516
521
return startTime , endTime
517
522
518
523
def _kickStuckJobs (self ):
519
- """Reschedule jobs stuck in initialization status Rescheduled, Matched"""
524
+ """Reschedule jobs stuck in initialization status Rescheduled,
525
+ Matched."""
520
526
521
527
message = ""
522
528
@@ -563,6 +569,7 @@ def _kickStuckJobs(self):
563
569
564
570
def _failSubmittingJobs (self ):
565
571
"""Failed Jobs stuck in Submitting Status for a long time.
572
+
566
573
They are due to a failed bulk submission transaction.
567
574
"""
568
575
@@ -587,11 +594,17 @@ def _sendKillCommand(self, job):
587
594
:param int job: ID of job to send kill command
588
595
"""
589
596
590
- owner = self .jobDB .getJobAttribute (job , "Owner" )["Value" ]
591
- ownerGroup = self .jobDB .getJobAttribute (job , "OwnerGroup" )["Value" ]
597
+ res = self .jobDB .getJobAttribute (job , "Owner" )
598
+ if not res ["OK" ]:
599
+ return res
600
+ owner = res ["Value" ]
601
+
602
+ res = self .jobDB .getJobAttribute (job , "OwnerGroup" )
603
+ if not res ["OK" ]:
604
+ return res
605
+ ownerGroup = res ["Value" ]
606
+
592
607
wmsClient = WMSClient (
593
608
useCertificates = True , delegatedDN = getDNForUsername (owner )["Value" ][0 ], delegatedGroup = ownerGroup
594
609
)
595
- resKill = wmsClient .killJob (job )
596
- if not resKill ["OK" ]:
597
- self .log .error ("Failed to send kill command to job" , f"{ job } : { resKill ['Message' ]} " )
610
+ return wmsClient .killJob (job )
0 commit comments